开门红自动统计代码

处理开门红通报数据,自动摘录相关数据和通报。

# Author: subk
# Time: 2024/1/10 16:53
# Desc: 处理25年集团市场开门红通报数据
# Version: 1.0

import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
import re
import schedule
import time

# 用户配置
username = "su@.XXXX.com"
password = "XXXXXX"
smtp_server = "smtp.XXX"
smtp_port = 465  # SSL端口
recipients = [""]  # 收件人列表

def get_dates():
    """
    获取今天的日期和昨天的日期。

    Returns:
        tuple: 包含今天日期(格式:YYYY年MM月DD日)和昨天日期(格式:YYYYMMDD)的元组。
    """
    today = datetime.now().strftime("%Y年%m月%d日")
    yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
    return today, yesterday

def fetch_web_content(url):
    """
    根据给定的URL获取网页内容。

    Args:
        url (str): 网页的URL地址。

    Returns:
        str: 网页的HTML内容。

    Raises:
        SystemExit: 如果请求失败,则打印错误信息并退出程序。
    """
    try:
        response = requests.get(url)
        response.raise_for_status()
        return response.text
    except requests.exceptions.RequestException as e:
        print(f"获取网页内容失败: {e}")
        exit(1)

def parse_html(html_content, districts, match_list):
    """
    解析HTML内容,提取各地区的通报弱项数、匹配数和具体内容。

    Args:
        html_content (str): 网页的HTML内容。
        districts (list): 区县名称列表。
        match_list (list): 需要匹配的关键字列表。

    Returns:
        dict: 包含各区县统计信息的字典。
    """
    soup = BeautifulSoup(html_content, "html.parser")
    title_contents = soup.find_all("div", class_="title_content")

    pattern = r"\d+、([^开门红]*)开门红"
    district_data = {district: {"count": 0, "content": [], "match_count": 0} for district in districts}

    for div in title_contents:
        green_fonts = div.find_all("font", color="green")
        for font in green_fonts:
            for district in districts:
                if district in font.get_text():
                    text_content = div.get_text()
                    matched_content = re.findall(pattern, text_content)
                    if matched_content:
                        district_data[district]["content"].extend(matched_content)
                        district_data[district]["count"] += len(matched_content)
                        for item in matched_content:
                            if any(match_item in item for match_item in match_list):
                                district_data[district]["match_count"] += 1
    return district_data

def build_stats_table(districts, district_data, match_list):
    """
    构建统计信息的HTML表格,并高亮显示最大值所在的单元格。

    Args:
        districts (list): 区县名称列表。
        district_data (dict): 各区县的统计信息字典。
        match_list (list): 需要匹配的关键字列表。

    Returns:
        str: 包含统计信息的HTML字符串。
    """
    match_list_html = "".join(f"<li>{item}</li>" for item in match_list)
    district_columns = "".join(f"<th>{district}</th>" for district in districts)

    # 找到通报弱项数的最大值
    max_weakness_count = max(district_data.values(), key=lambda x: x["count"])["count"]
    max_match_count = max(district_data.values(), key=lambda x: x["match_count"])["match_count"]

    count_values = ""
    match_count_values = ""
    content_values = ""

    for district in districts:
        count_style = 'style="background-color: lightgreen;"' if district_data[district]["count"] == max_weakness_count else ''
        match_count_style = 'style="background-color: lightgreen;"' if district_data[district]["match_count"] == max_match_count else ''
        count_values += f"<td {count_style}>{district_data[district]['count']}</td>"
        match_count_values += f"<td {match_count_style}>{district_data[district]['match_count']}</td>"
        content_values += f"<td>{'<br>'.join(district_data[district]['content'])}</td>"

    stats_info = """
        <div>
            <p><b>近期11项重点跟踪的产能项目:</b></p>
            <ul>
                {}
            </ul>
            <p><b>统计信息:</b></p>
            <table border="1" cellpadding="10" cellspacing="0" style="border-collapse: collapse; width: 100%; text-align: center;">
                <thead style="background-color: #0085d0; color: white;">
                    <tr>
                        <th><b>区县</b></th>
                        {}
                    </tr>
                </thead>
                <tbody>
                    <tr style="background-color: #f9f9f9;">
                        <td><b>通报弱项数</b></td>
                        {}
                    </tr>
                    <tr style="background-color: #f9f9f9;">
                        <td><b>产能跟踪中的匹配数</b></td>
                        {}
                    </tr>
                    <tr style="background-color: #f9f9f9;">
                        <td><b>通报的具体弱项</b></td>
                        {}
                    </tr>
                </tbody>
            </table>
            <br><br><br>
        </div>
    """.format(match_list_html, district_columns, count_values, match_count_values, content_values)
    return stats_info

def extract_district_data(soup):
    """
    从HTML中提取区县的得分和排名数据。

    Args:
        soup (BeautifulSoup): 解析后的HTML对象。

    Returns:
        dict: 包含区县得分和排名的数据字典。

    Raises:
        ValueError: 如果无法找到目标表格或表格结构不符合预期。
    """
    html_table = soup.find("div", class_="html_table")
    if not html_table:
        raise ValueError("未找到目标表格")

    rows = html_table.find_all("tr")
    if len(rows) < 3:
        raise ValueError("表格行数不足")

    headers = [th.text.strip() for th in rows[0].find_all("th")[1:]]  # 区县名称(跳过"区县"列)
    scores = [td.text.strip() for td in rows[1].find_all("td")[1:]]  # 总得分(从第二列开始)
    ranks = [td.text.strip() for td in rows[2].find_all("td")[1:]]  # 总排名(从第二列开始)

    # 检查是否所有列都有数据
    if len(headers) != len(scores) or len(headers) != len(ranks):
        raise ValueError("表格列数不一致")

    # 构建区县数据
    district_data = {
        headers[i]: {"score": float(scores[i]), "rank": int(ranks[i])}
        for i in range(len(headers))
    }
    return district_data

def highlight_text(html, keywords):
    """
    在HTML内容中高亮显示指定的关键字。

    Args:
        html (str): HTML内容字符串。
        keywords (list): 需要高亮显示的关键字列表。

    Returns:
        str: 高亮显示后的HTML内容字符串。
    """
    for keyword in keywords:
        if keyword == "LS":
            html = html.replace(keyword, f'<span style="background-color: orange;">{keyword}</span>')
    return html

def send_email(subject, body, recipients, username, password, smtp_server, smtp_port):
    """
    发送电子邮件。

    Args:
        subject (str): 邮件主题。
        body (str): 邮件正文(HTML格式)。
        recipients (list): 收件人列表。
        username (str): 发件人用户名。
        password (str): 发件人密码。
        smtp_server (str): SMTP服务器地址。
        smtp_port (int): SMTP服务器端口号。

    Raises:
        Exception: 如果发送邮件失败,则打印错误信息。
    """
    message = MIMEMultipart()
    message["From"] = username
    message["To"] = ", ".join(recipients)
    message["Subject"] = subject

    message.attach(MIMEText(body, "html"))

    try:
        with smtplib.SMTP_SSL(smtp_server, smtp_port) as server:
            server.login(username, password)
            server.sendmail(username, recipients, message.as_string())
            print("邮件发送成功!")
    except smtplib.SMTPException as e:
        print(f"发送邮件失败: {e}")

def job():
    """
    执行主任务的函数,如果成功则返回True,失败返回False
    """
    try:
        today, yesterday = get_dates()
        url = f"http://10.33.222.52:31002/hamobile/table/mailReport?cfg_id=202&flag=jtReport2&day_time={yesterday}"

        html_content = fetch_web_content(url)
        soup = BeautifulSoup(html_content, "html.parser")

        # 尝试提取区县数据,如果失败说明数据还没出
        district_scores = extract_district_data(soup)

        # 如果能执行到这里,说明数据已经出来了
        districts = ["XX", "XX"]
        match_list = [
            "直管木本资源销售", "两线新增条数", "商客市场计费宽带新增", "政企新入网",
            "触达类大数据产品发送量", "云视讯新增终端数", "和对讲新增终端数", "电子学生证用户新增",
            "视联网新增", "FTTO新增", "24年及以前存量欠费回收"
        ]

        district_data = parse_html(html_content, districts, match_list)
        stats_info = build_stats_table(districts, district_data, match_list)

        # 提取区县数据
        lian_shui_data = district_scores.get("涟水", {})
        total_score = lian_shui_data.get("score", "N/A")
        total_rank = lian_shui_data.get("rank", "N/A")

        # 找到排名在LS之前的单位
        units_before = [
            district for district, data in district_scores.items()
            if data["rank"] < total_rank
        ]
        units_before_sorted = sorted(units_before, key=lambda x: district_scores[x]["rank"])

        # LS通报弱项数和产能跟踪中的匹配数
        lian_shui_weakness_count = district_data["涟水"]["count"]
        lian_shui_match_count = district_data["涟水"]["match_count"]

        # 找到产能跟踪中的匹配数最大值对应的区县
        max_match_count = max(district_data.values(), key=lambda x: x["match_count"])["match_count"]
        max_match_district = next((district for district, data in district_data.items() if data["match_count"] == max_match_count), "N/A")

        # 构建邮件开头的新内容
        additional_info = f"""
            <div>
                <p><b>LS核心产品总得分:{total_score},总排名:{total_rank},排名在LS之前的单位:{', '.join(units_before_sorted)}</b></p>
                <p><b>LS通报弱项数:{lian_shui_weakness_count},产能跟踪中的匹配数:{lian_shui_match_count}(匹配数最多:{max_match_district})</b></p>
            </div>
        """

        html_content = highlight_text(html_content, districts)
        html_content = additional_info + stats_info + html_content

        subject = f'【抢先版】集团市场"开门红"专项营销活动通报({yesterday})'

        # 发送成功的邮件
        send_email(subject, html_content, recipients, username, password, smtp_server, smtp_port)

        return True  # 执行成功

    except Exception as e:
        print(f"执行失败: {e}")
        return False  # 执行失败

def main():
    """
    主函数,设置定时任务并立即执行一次任务
    """
    today = datetime.now().date()
    task_executed_today = False  # 标志变量,记录当天是否已经执行过任务

    while True:
        now = datetime.now()
        if not task_executed_today and now.hour >= 15:
            success = job()
            if success:
                task_executed_today = True
                print(f"任务在 {now} 成功执行,今天的任务已完成")

        # 每分钟检查一次是否有待执行的任务
        schedule.run_pending()
        time.sleep(60)

        # 检查是否需要重置任务执行标志
        if datetime.now().date() > today:
            today = datetime.now().date()
            task_executed_today = False
            print(f"日期已更新为 {today},重置任务执行标志")

if __name__ == "__main__":
    main()