第170集:自动化综合项目
一、项目概述
在前面的课程中,我们学习了多种Python自动化技术,包括文件操作自动化、数据处理自动化、网络请求自动化、错误处理自动化、日志处理自动化和定时任务自动化等。本节课,我们将把这些技术整合起来,完成一个完整的自动化综合项目——天气数据自动化收集与分析系统。
1.1 项目目标
本项目旨在创建一个能够自动收集天气数据、进行分析处理、生成可视化报告,并定期发送给用户的自动化系统。通过这个项目,你将学习如何将各种自动化技术整合到一个完整的应用中。
1.2 项目特点
- 全自动化:从数据收集到报告生成完全自动化,无需人工干预
- 模块化设计:系统分为多个功能模块,便于维护和扩展
- 错误处理完善:包含全面的错误处理机制,确保系统稳定运行
- 日志记录详细:记录系统运行的全过程,便于监控和调试
- 可视化展示:生成直观的数据可视化图表,便于数据分析
二、项目需求分析
2.1 功能需求
数据收集模块:
- 自动从天气API获取指定城市的天气数据
- 支持配置多个城市
- 定期执行数据收集任务
数据处理模块:
- 解析和清洗收集到的天气数据
- 存储数据到本地文件或数据库
- 对历史数据进行统计分析
可视化模块:
- 生成天气趋势图表(温度、湿度、降水量等)
- 生成天气状况统计图表
- 支持自定义图表样式
报告生成模块:
- 将分析结果和可视化图表整合到报告中
- 支持多种报告格式(如HTML、PDF等)
通知模块:
- 定期将报告发送给指定用户
- 支持多种通知方式(如邮件、短信等)
配置管理模块:
- 支持通过配置文件或命令行参数配置系统
- 支持动态修改配置
2.2 非功能需求
- 稳定性:系统能够稳定运行,处理各种异常情况
- 可扩展性:支持添加新的数据源和分析功能
- 可维护性:代码结构清晰,便于维护和升级
- 性能:系统运行高效,资源占用合理
- 安全性:保护用户的个人信息和API密钥
三、系统设计
3.1 系统架构
┌─────────────────────────────────────────────────────────────────┐
│ 自动化综合项目 │
├─────────────┬─────────────┬─────────────┬─────────────┬─────────┤
│ 数据收集模块 │ 数据处理模块 │ 可视化模块 │ 报告生成模块 │ 通知模块 │
├─────────────┼─────────────┼─────────────┼─────────────┼─────────┤
│ API请求 │ 数据解析 │ 图表生成 │ 报告整合 │ 邮件发送 │
│ 数据获取 │ 数据清洗 │ 数据可视化 │ 格式转换 │ 短信发送 │
│ 定时任务 │ 数据存储 │ │ │ │
└─────────────┴─────────────┴─────────────┴─────────────┴─────────┘
│ │ │ │ │
└────────────┴──────────────┴──────────────┴────────────┘
│
┌─────────────┐
│ 配置管理模块 │
├─────────────┤
│ 配置文件管理 │
│ 日志管理 │
│ 错误处理 │
└─────────────┘3.2 模块设计
3.2.1 数据收集模块
- 功能:从天气API获取指定城市的天气数据
- 技术点:HTTP请求、API调用、定时任务
- 关键函数:
fetch_weather_data()、schedule_data_collection()
3.2.2 数据处理模块
- 功能:解析、清洗和存储天气数据
- 技术点:JSON解析、数据清洗、文件操作、错误处理
- 关键函数:
parse_weather_data()、clean_data()、save_data()
3.2.3 可视化模块
- 功能:生成天气数据可视化图表
- 技术点:数据可视化、Matplotlib使用
- 关键函数:
generate_temperature_chart()、generate_humidity_chart()
3.2.4 报告生成模块
- 功能:整合分析结果和图表,生成报告
- 技术点:HTML生成、模板使用
- 关键函数:
generate_report()、format_report()
3.2.5 通知模块
- 功能:发送报告给指定用户
- 技术点:邮件发送、SMTP协议
- 关键函数:
send_email()
3.2.6 配置管理模块
- 功能:管理系统配置和日志
- 技术点:配置文件解析、日志处理、错误处理
- 关键函数:
load_config()、setup_logging()、handle_error()
3.3 数据流程
1. 定时触发数据收集任务
2. 从天气API获取原始天气数据
3. 解析和清洗获取到的数据
4. 将处理后的数据存储到本地
5. 对历史数据进行统计分析
6. 生成天气数据可视化图表
7. 整合分析结果和图表生成报告
8. 将报告发送给指定用户
9. 记录整个过程的日志四、项目实现
4.1 技术选型
- 编程语言:Python 3.8+
- 主要依赖库:
requests:用于发送HTTP请求matplotlib:用于数据可视化pandas:用于数据处理和分析schedule:用于定时任务smtplib:用于发送邮件logging:用于日志记录configparser:用于配置文件解析json:用于JSON数据处理os、sys:用于系统操作
4.2 项目结构
weather_automation/
├── config/ # 配置文件目录
│ └── config.ini # 系统配置文件
├── data/ # 数据存储目录
│ ├── raw/ # 原始数据
│ └── processed/ # 处理后的数据
├── logs/ # 日志文件目录
├── reports/ # 报告文件目录
├── charts/ # 图表文件目录
├── src/ # 源代码目录
│ ├── __init__.py # 包初始化文件
│ ├── config_manager.py # 配置管理模块
│ ├── data_collector.py # 数据收集模块
│ ├── data_processor.py # 数据处理模块
│ ├── visualizer.py # 可视化模块
│ ├── report_generator.py # 报告生成模块
│ ├── notification.py # 通知模块
│ └── main.py # 主程序
├── requirements.txt # 依赖库列表
└── README.md # 项目说明文档4.3 代码实现
由于项目代码较长,我们将分模块展示关键代码片段。完整代码请参考配套的演示文件。
4.3.1 配置管理模块 (config_manager.py)
import configparser
import logging
import os
class ConfigManager:
"""配置管理类"""
def __init__(self, config_file='config/config.ini'):
self.config_file = config_file
self.config = None
self.logger = self.setup_logging()
self.load_config()
def setup_logging(self):
"""设置日志记录"""
# 创建日志目录
if not os.path.exists('logs'):
os.makedirs('logs')
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='logs/weather_automation.log',
filemode='a'
)
# 创建日志记录器
logger = logging.getLogger('WeatherAutomation')
# 添加控制台输出
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
def load_config(self):
"""加载配置文件"""
try:
self.config = configparser.ConfigParser()
self.config.read(self.config_file, encoding='utf-8')
self.logger.info(f"成功加载配置文件: {self.config_file}")
except Exception as e:
self.logger.error(f"加载配置文件失败: {e}")
raise
def get(self, section, option):
"""获取配置值"""
try:
return self.config.get(section, option)
except Exception as e:
self.logger.error(f"获取配置失败 [{section}.{option}]: {e}")
raise
def get_int(self, section, option):
"""获取整数类型的配置值"""
try:
return self.config.getint(section, option)
except Exception as e:
self.logger.error(f"获取整数配置失败 [{section}.{option}]: {e}")
raise
def get_float(self, section, option):
"""获取浮点数类型的配置值"""
try:
return self.config.getfloat(section, option)
except Exception as e:
self.logger.error(f"获取浮点数配置失败 [{section}.{option}]: {e}")
raise
def get_boolean(self, section, option):
"""获取布尔类型的配置值"""
try:
return self.config.getboolean(section, option)
except Exception as e:
self.logger.error(f"获取布尔配置失败 [{section}.{option}]: {e}")
raise4.3.2 数据收集模块 (data_collector.py)
import requests
import json
import os
from datetime import datetime
from src.config_manager import ConfigManager
class DataCollector:
"""数据收集类"""
def __init__(self, config_manager):
self.config = config_manager
self.logger = self.config.logger
self.api_key = self.config.get('WeatherAPI', 'api_key')
self.city = self.config.get('WeatherAPI', 'city')
self.base_url = self.config.get('WeatherAPI', 'base_url')
# 创建数据目录
if not os.path.exists('data/raw'):
os.makedirs('data/raw')
def fetch_weather_data(self):
"""从天气API获取数据"""
try:
# 构建请求URL
url = f"{self.base_url}?q={self.city}&appid={self.api_key}&units=metric&lang=zh_cn"
# 发送请求
self.logger.info(f"正在获取{self.city}的天气数据")
response = requests.get(url, timeout=10)
response.raise_for_status() # 检查HTTP状态码
# 解析响应数据
data = response.json()
self.logger.info(f"成功获取{self.city}的天气数据")
# 保存原始数据
self.save_raw_data(data)
return data
except requests.exceptions.HTTPError as e:
self.logger.error(f"HTTP错误: {e}")
raise
except requests.exceptions.ConnectionError as e:
self.logger.error(f"连接错误: {e}")
raise
except requests.exceptions.Timeout as e:
self.logger.error(f"超时错误: {e}")
raise
except Exception as e:
self.logger.error(f"获取天气数据失败: {e}")
raise
def save_raw_data(self, data):
"""保存原始数据"""
try:
# 生成文件名(基于当前时间)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"data/raw/weather_{self.city}_{timestamp}.json"
# 保存数据
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
self.logger.info(f"原始数据已保存到: {filename}")
except Exception as e:
self.logger.error(f"保存原始数据失败: {e}")
raise4.3.3 数据处理模块 (data_processor.py)
import json
import os
import pandas as pd
from datetime import datetime
from src.config_manager import ConfigManager
class DataProcessor:
"""数据处理类"""
def __init__(self, config_manager):
self.config = config_manager
self.logger = self.config.logger
# 创建数据目录
if not os.path.exists('data/processed'):
os.makedirs('data/processed')
def parse_weather_data(self, raw_data):
"""解析天气数据"""
try:
self.logger.info("正在解析天气数据")
# 提取基本信息
city = raw_data['name']
country = raw_data['sys']['country']
timestamp = datetime.fromtimestamp(raw_data['dt'])
# 提取当前天气信息
current_weather = raw_data['weather'][0]
weather_main = current_weather['main']
weather_description = current_weather['description']
# 提取温度信息
main = raw_data['main']
temp = main['temp']
temp_min = main['temp_min']
temp_max = main['temp_max']
humidity = main['humidity']
# 提取风速信息
wind_speed = raw_data['wind']['speed']
# 提取云层信息
clouds = raw_data['clouds']['all']
# 提取降水信息
rain = raw_data.get('rain', {}).get('1h', 0)
snow = raw_data.get('snow', {}).get('1h', 0)
# 构建处理后的数据
processed_data = {
'city': city,
'country': country,
'timestamp': timestamp,
'date': timestamp.date(),
'time': timestamp.time(),
'weather_main': weather_main,
'weather_description': weather_description,
'temp': temp,
'temp_min': temp_min,
'temp_max': temp_max,
'humidity': humidity,
'wind_speed': wind_speed,
'clouds': clouds,
'rain_1h': rain,
'snow_1h': snow
}
self.logger.info("天气数据解析完成")
return processed_data
except KeyError as e:
self.logger.error(f"解析天气数据失败,缺少字段: {e}")
raise
except Exception as e:
self.logger.error(f"解析天气数据失败: {e}")
raise
def save_processed_data(self, data):
"""保存处理后的数据"""
try:
self.logger.info("正在保存处理后的数据")
# 生成CSV文件名(按日期)
date_str = data['date'].strftime("%Y%m%d")
filename = f"data/processed/weather_{data['city']}_{date_str}.csv"
# 转换为DataFrame
df = pd.DataFrame([data])
# 保存到CSV文件(如果文件已存在,则追加)
if os.path.exists(filename):
df.to_csv(filename, mode='a', header=False, index=False, encoding='utf-8')
else:
df.to_csv(filename, index=False, encoding='utf-8')
self.logger.info(f"处理后的数据已保存到: {filename}")
return filename
except Exception as e:
self.logger.error(f"保存处理后的数据失败: {e}")
raise
def load_historical_data(self, city, days=7):
"""加载历史数据"""
try:
self.logger.info(f"正在加载{city}最近{days}天的历史数据")
# 获取最近days天的日期
dates = [(datetime.now() - pd.Timedelta(days=i)).strftime("%Y%m%d") for i in range(days)][::-1]
# 加载所有数据文件
all_data = []
for date in dates:
filename = f"data/processed/weather_{city}_{date}.csv"
if os.path.exists(filename):
df = pd.read_csv(filename)
all_data.append(df)
# 合并数据
if all_data:
df_combined = pd.concat(all_data, ignore_index=True)
# 转换timestamp列为datetime类型
df_combined['timestamp'] = pd.to_datetime(df_combined['timestamp'])
self.logger.info(f"成功加载{len(df_combined)}条历史数据")
return df_combined
else:
self.logger.warning(f"未找到{city}的历史数据")
return pd.DataFrame()
except Exception as e:
self.logger.error(f"加载历史数据失败: {e}")
raise
def analyze_data(self, df):
"""分析数据"""
try:
if df.empty:
self.logger.warning("没有数据可分析")
return None
self.logger.info("正在分析天气数据")
# 计算统计信息
analysis = {
'period_start': df['timestamp'].min().strftime("%Y-%m-%d %H:%M"),
'period_end': df['timestamp'].max().strftime("%Y-%m-%d %H:%M"),
'total_records': len(df),
'avg_temp': round(df['temp'].mean(), 2),
'max_temp': round(df['temp'].max(), 2),
'min_temp': round(df['temp'].min(), 2),
'avg_humidity': round(df['humidity'].mean(), 2),
'avg_wind_speed': round(df['wind_speed'].mean(), 2),
'most_common_weather': df['weather_main'].mode().iloc[0] if not df['weather_main'].mode().empty else None,
'total_rain': round(df['rain_1h'].sum(), 2),
'total_snow': round(df['snow_1h'].sum(), 2)
}
self.logger.info("数据