处理开门红通报数据,自动摘录相关数据和通报。
# Author: subk
# Time: 2024/1/10 16:53
# Desc: 处理25年集团市场开门红通报数据
# Version: 1.0
import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
import re
import schedule
import time
# 用户配置
username = "su@.XXXX.com"
password = "XXXXXX"
smtp_server = "smtp.XXX"
smtp_port = 465 # SSL端口
recipients = [""] # 收件人列表
def get_dates():
"""
获取今天的日期和昨天的日期。
Returns:
tuple: 包含今天日期(格式:YYYY年MM月DD日)和昨天日期(格式:YYYYMMDD)的元组。
"""
today = datetime.now().strftime("%Y年%m月%d日")
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
return today, yesterday
def fetch_web_content(url):
"""
根据给定的URL获取网页内容。
Args:
url (str): 网页的URL地址。
Returns:
str: 网页的HTML内容。
Raises:
SystemExit: 如果请求失败,则打印错误信息并退出程序。
"""
try:
response = requests.get(url)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
print(f"获取网页内容失败: {e}")
exit(1)
def parse_html(html_content, districts, match_list):
"""
解析HTML内容,提取各地区的通报弱项数、匹配数和具体内容。
Args:
html_content (str): 网页的HTML内容。
districts (list): 区县名称列表。
match_list (list): 需要匹配的关键字列表。
Returns:
dict: 包含各区县统计信息的字典。
"""
soup = BeautifulSoup(html_content, "html.parser")
title_contents = soup.find_all("div", class_="title_content")
pattern = r"\d+、([^开门红]*)开门红"
district_data = {district: {"count": 0, "content": [], "match_count": 0} for district in districts}
for div in title_contents:
green_fonts = div.find_all("font", color="green")
for font in green_fonts:
for district in districts:
if district in font.get_text():
text_content = div.get_text()
matched_content = re.findall(pattern, text_content)
if matched_content:
district_data[district]["content"].extend(matched_content)
district_data[district]["count"] += len(matched_content)
for item in matched_content:
if any(match_item in item for match_item in match_list):
district_data[district]["match_count"] += 1
return district_data
def build_stats_table(districts, district_data, match_list):
"""
构建统计信息的HTML表格,并高亮显示最大值所在的单元格。
Args:
districts (list): 区县名称列表。
district_data (dict): 各区县的统计信息字典。
match_list (list): 需要匹配的关键字列表。
Returns:
str: 包含统计信息的HTML字符串。
"""
match_list_html = "".join(f"<li>{item}</li>" for item in match_list)
district_columns = "".join(f"<th>{district}</th>" for district in districts)
# 找到通报弱项数的最大值
max_weakness_count = max(district_data.values(), key=lambda x: x["count"])["count"]
max_match_count = max(district_data.values(), key=lambda x: x["match_count"])["match_count"]
count_values = ""
match_count_values = ""
content_values = ""
for district in districts:
count_style = 'style="background-color: lightgreen;"' if district_data[district]["count"] == max_weakness_count else ''
match_count_style = 'style="background-color: lightgreen;"' if district_data[district]["match_count"] == max_match_count else ''
count_values += f"<td {count_style}>{district_data[district]['count']}</td>"
match_count_values += f"<td {match_count_style}>{district_data[district]['match_count']}</td>"
content_values += f"<td>{'<br>'.join(district_data[district]['content'])}</td>"
stats_info = """
<div>
<p><b>近期11项重点跟踪的产能项目:</b></p>
<ul>
{}
</ul>
<p><b>统计信息:</b></p>
<table border="1" cellpadding="10" cellspacing="0" style="border-collapse: collapse; width: 100%; text-align: center;">
<thead style="background-color: #0085d0; color: white;">
<tr>
<th><b>区县</b></th>
{}
</tr>
</thead>
<tbody>
<tr style="background-color: #f9f9f9;">
<td><b>通报弱项数</b></td>
{}
</tr>
<tr style="background-color: #f9f9f9;">
<td><b>产能跟踪中的匹配数</b></td>
{}
</tr>
<tr style="background-color: #f9f9f9;">
<td><b>通报的具体弱项</b></td>
{}
</tr>
</tbody>
</table>
<br><br><br>
</div>
""".format(match_list_html, district_columns, count_values, match_count_values, content_values)
return stats_info
def extract_district_data(soup):
"""
从HTML中提取区县的得分和排名数据。
Args:
soup (BeautifulSoup): 解析后的HTML对象。
Returns:
dict: 包含区县得分和排名的数据字典。
Raises:
ValueError: 如果无法找到目标表格或表格结构不符合预期。
"""
html_table = soup.find("div", class_="html_table")
if not html_table:
raise ValueError("未找到目标表格")
rows = html_table.find_all("tr")
if len(rows) < 3:
raise ValueError("表格行数不足")
headers = [th.text.strip() for th in rows[0].find_all("th")[1:]] # 区县名称(跳过"区县"列)
scores = [td.text.strip() for td in rows[1].find_all("td")[1:]] # 总得分(从第二列开始)
ranks = [td.text.strip() for td in rows[2].find_all("td")[1:]] # 总排名(从第二列开始)
# 检查是否所有列都有数据
if len(headers) != len(scores) or len(headers) != len(ranks):
raise ValueError("表格列数不一致")
# 构建区县数据
district_data = {
headers[i]: {"score": float(scores[i]), "rank": int(ranks[i])}
for i in range(len(headers))
}
return district_data
def highlight_text(html, keywords):
"""
在HTML内容中高亮显示指定的关键字。
Args:
html (str): HTML内容字符串。
keywords (list): 需要高亮显示的关键字列表。
Returns:
str: 高亮显示后的HTML内容字符串。
"""
for keyword in keywords:
if keyword == "LS":
html = html.replace(keyword, f'<span style="background-color: orange;">{keyword}</span>')
return html
def send_email(subject, body, recipients, username, password, smtp_server, smtp_port):
"""
发送电子邮件。
Args:
subject (str): 邮件主题。
body (str): 邮件正文(HTML格式)。
recipients (list): 收件人列表。
username (str): 发件人用户名。
password (str): 发件人密码。
smtp_server (str): SMTP服务器地址。
smtp_port (int): SMTP服务器端口号。
Raises:
Exception: 如果发送邮件失败,则打印错误信息。
"""
message = MIMEMultipart()
message["From"] = username
message["To"] = ", ".join(recipients)
message["Subject"] = subject
message.attach(MIMEText(body, "html"))
try:
with smtplib.SMTP_SSL(smtp_server, smtp_port) as server:
server.login(username, password)
server.sendmail(username, recipients, message.as_string())
print("邮件发送成功!")
except smtplib.SMTPException as e:
print(f"发送邮件失败: {e}")
def job():
"""
执行主任务的函数,如果成功则返回True,失败返回False
"""
try:
today, yesterday = get_dates()
url = f"http://10.33.222.52:31002/hamobile/table/mailReport?cfg_id=202&flag=jtReport2&day_time={yesterday}"
html_content = fetch_web_content(url)
soup = BeautifulSoup(html_content, "html.parser")
# 尝试提取区县数据,如果失败说明数据还没出
district_scores = extract_district_data(soup)
# 如果能执行到这里,说明数据已经出来了
districts = ["XX", "XX"]
match_list = [
"直管木本资源销售", "两线新增条数", "商客市场计费宽带新增", "政企新入网",
"触达类大数据产品发送量", "云视讯新增终端数", "和对讲新增终端数", "电子学生证用户新增",
"视联网新增", "FTTO新增", "24年及以前存量欠费回收"
]
district_data = parse_html(html_content, districts, match_list)
stats_info = build_stats_table(districts, district_data, match_list)
# 提取区县数据
lian_shui_data = district_scores.get("涟水", {})
total_score = lian_shui_data.get("score", "N/A")
total_rank = lian_shui_data.get("rank", "N/A")
# 找到排名在LS之前的单位
units_before = [
district for district, data in district_scores.items()
if data["rank"] < total_rank
]
units_before_sorted = sorted(units_before, key=lambda x: district_scores[x]["rank"])
# LS通报弱项数和产能跟踪中的匹配数
lian_shui_weakness_count = district_data["涟水"]["count"]
lian_shui_match_count = district_data["涟水"]["match_count"]
# 找到产能跟踪中的匹配数最大值对应的区县
max_match_count = max(district_data.values(), key=lambda x: x["match_count"])["match_count"]
max_match_district = next((district for district, data in district_data.items() if data["match_count"] == max_match_count), "N/A")
# 构建邮件开头的新内容
additional_info = f"""
<div>
<p><b>LS核心产品总得分:{total_score},总排名:{total_rank},排名在LS之前的单位:{', '.join(units_before_sorted)}</b></p>
<p><b>LS通报弱项数:{lian_shui_weakness_count},产能跟踪中的匹配数:{lian_shui_match_count}(匹配数最多:{max_match_district})</b></p>
</div>
"""
html_content = highlight_text(html_content, districts)
html_content = additional_info + stats_info + html_content
subject = f'【抢先版】集团市场"开门红"专项营销活动通报({yesterday})'
# 发送成功的邮件
send_email(subject, html_content, recipients, username, password, smtp_server, smtp_port)
return True # 执行成功
except Exception as e:
print(f"执行失败: {e}")
return False # 执行失败
def main():
"""
主函数,设置定时任务并立即执行一次任务
"""
today = datetime.now().date()
task_executed_today = False # 标志变量,记录当天是否已经执行过任务
while True:
now = datetime.now()
if not task_executed_today and now.hour >= 15:
success = job()
if success:
task_executed_today = True
print(f"任务在 {now} 成功执行,今天的任务已完成")
# 每分钟检查一次是否有待执行的任务
schedule.run_pending()
time.sleep(60)
# 检查是否需要重置任务执行标志
if datetime.now().date() > today:
today = datetime.now().date()
task_executed_today = False
print(f"日期已更新为 {today},重置任务执行标志")
if __name__ == "__main__":
main()