第170集:自动化综合项目

一、项目概述

在前面的课程中,我们学习了多种Python自动化技术,包括文件操作自动化、数据处理自动化、网络请求自动化、错误处理自动化、日志处理自动化和定时任务自动化等。本节课,我们将把这些技术整合起来,完成一个完整的自动化综合项目——天气数据自动化收集与分析系统

1.1 项目目标

本项目旨在创建一个能够自动收集天气数据、进行分析处理、生成可视化报告,并定期发送给用户的自动化系统。通过这个项目,你将学习如何将各种自动化技术整合到一个完整的应用中。

1.2 项目特点

  • 全自动化:从数据收集到报告生成完全自动化,无需人工干预
  • 模块化设计:系统分为多个功能模块,便于维护和扩展
  • 错误处理完善:包含全面的错误处理机制,确保系统稳定运行
  • 日志记录详细:记录系统运行的全过程,便于监控和调试
  • 可视化展示:生成直观的数据可视化图表,便于数据分析

二、项目需求分析

2.1 功能需求

  1. 数据收集模块

    • 自动从天气API获取指定城市的天气数据
    • 支持配置多个城市
    • 定期执行数据收集任务
  2. 数据处理模块

    • 解析和清洗收集到的天气数据
    • 存储数据到本地文件或数据库
    • 对历史数据进行统计分析
  3. 可视化模块

    • 生成天气趋势图表(温度、湿度、降水量等)
    • 生成天气状况统计图表
    • 支持自定义图表样式
  4. 报告生成模块

    • 将分析结果和可视化图表整合到报告中
    • 支持多种报告格式(如HTML、PDF等)
  5. 通知模块

    • 定期将报告发送给指定用户
    • 支持多种通知方式(如邮件、短信等)
  6. 配置管理模块

    • 支持通过配置文件或命令行参数配置系统
    • 支持动态修改配置

2.2 非功能需求

  1. 稳定性:系统能够稳定运行,处理各种异常情况
  2. 可扩展性:支持添加新的数据源和分析功能
  3. 可维护性:代码结构清晰,便于维护和升级
  4. 性能:系统运行高效,资源占用合理
  5. 安全性:保护用户的个人信息和API密钥

三、系统设计

3.1 系统架构

┌─────────────────────────────────────────────────────────────────┐
│                        自动化综合项目                            │
├─────────────┬─────────────┬─────────────┬─────────────┬─────────┤
│ 数据收集模块 │ 数据处理模块 │ 可视化模块 │ 报告生成模块 │ 通知模块 │
├─────────────┼─────────────┼─────────────┼─────────────┼─────────┤
│    API请求    │    数据解析    │    图表生成    │    报告整合    │    邮件发送    │
│    数据获取    │    数据清洗    │    数据可视化   │    格式转换    │    短信发送    │
│    定时任务    │    数据存储    │               │               │               │
└─────────────┴─────────────┴─────────────┴─────────────┴─────────┘
        │            │              │              │            │
        └────────────┴──────────────┴──────────────┴────────────┘
                          │
                     ┌─────────────┐
                     │ 配置管理模块 │
                     ├─────────────┤
                     │ 配置文件管理 │
                     │ 日志管理     │
                     │ 错误处理     │
                     └─────────────┘

3.2 模块设计

3.2.1 数据收集模块

  • 功能:从天气API获取指定城市的天气数据
  • 技术点:HTTP请求、API调用、定时任务
  • 关键函数fetch_weather_data()schedule_data_collection()

3.2.2 数据处理模块

  • 功能:解析、清洗和存储天气数据
  • 技术点:JSON解析、数据清洗、文件操作、错误处理
  • 关键函数parse_weather_data()clean_data()save_data()

3.2.3 可视化模块

  • 功能:生成天气数据可视化图表
  • 技术点:数据可视化、Matplotlib使用
  • 关键函数generate_temperature_chart()generate_humidity_chart()

3.2.4 报告生成模块

  • 功能:整合分析结果和图表,生成报告
  • 技术点:HTML生成、模板使用
  • 关键函数generate_report()format_report()

3.2.5 通知模块

  • 功能:发送报告给指定用户
  • 技术点:邮件发送、SMTP协议
  • 关键函数send_email()

3.2.6 配置管理模块

  • 功能:管理系统配置和日志
  • 技术点:配置文件解析、日志处理、错误处理
  • 关键函数load_config()setup_logging()handle_error()

3.3 数据流程

1. 定时触发数据收集任务
2. 从天气API获取原始天气数据
3. 解析和清洗获取到的数据
4. 将处理后的数据存储到本地
5. 对历史数据进行统计分析
6. 生成天气数据可视化图表
7. 整合分析结果和图表生成报告
8. 将报告发送给指定用户
9. 记录整个过程的日志

四、项目实现

4.1 技术选型

  • 编程语言:Python 3.8+
  • 主要依赖库
    • requests:用于发送HTTP请求
    • matplotlib:用于数据可视化
    • pandas:用于数据处理和分析
    • schedule:用于定时任务
    • smtplib:用于发送邮件
    • logging:用于日志记录
    • configparser:用于配置文件解析
    • json:用于JSON数据处理
    • ossys:用于系统操作

4.2 项目结构

weather_automation/
├── config/              # 配置文件目录
│   └── config.ini       # 系统配置文件
├── data/                # 数据存储目录
│   ├── raw/             # 原始数据
│   └── processed/       # 处理后的数据
├── logs/                # 日志文件目录
├── reports/             # 报告文件目录
├── charts/              # 图表文件目录
├── src/                 # 源代码目录
│   ├── __init__.py      # 包初始化文件
│   ├── config_manager.py # 配置管理模块
│   ├── data_collector.py # 数据收集模块
│   ├── data_processor.py # 数据处理模块
│   ├── visualizer.py    # 可视化模块
│   ├── report_generator.py # 报告生成模块
│   ├── notification.py  # 通知模块
│   └── main.py          # 主程序
├── requirements.txt     # 依赖库列表
└── README.md            # 项目说明文档

4.3 代码实现

由于项目代码较长,我们将分模块展示关键代码片段。完整代码请参考配套的演示文件。

4.3.1 配置管理模块 (config_manager.py)

import configparser
import logging
import os

class ConfigManager:
    """配置管理类"""
    
    def __init__(self, config_file='config/config.ini'):
        self.config_file = config_file
        self.config = None
        self.logger = self.setup_logging()
        self.load_config()
    
    def setup_logging(self):
        """设置日志记录"""
        # 创建日志目录
        if not os.path.exists('logs'):
            os.makedirs('logs')
        
        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            filename='logs/weather_automation.log',
            filemode='a'
        )
        
        # 创建日志记录器
        logger = logging.getLogger('WeatherAutomation')
        
        # 添加控制台输出
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        console_handler.setFormatter(formatter)
        logger.addHandler(console_handler)
        
        return logger
    
    def load_config(self):
        """加载配置文件"""
        try:
            self.config = configparser.ConfigParser()
            self.config.read(self.config_file, encoding='utf-8')
            self.logger.info(f"成功加载配置文件: {self.config_file}")
        except Exception as e:
            self.logger.error(f"加载配置文件失败: {e}")
            raise
    
    def get(self, section, option):
        """获取配置值"""
        try:
            return self.config.get(section, option)
        except Exception as e:
            self.logger.error(f"获取配置失败 [{section}.{option}]: {e}")
            raise
    
    def get_int(self, section, option):
        """获取整数类型的配置值"""
        try:
            return self.config.getint(section, option)
        except Exception as e:
            self.logger.error(f"获取整数配置失败 [{section}.{option}]: {e}")
            raise
    
    def get_float(self, section, option):
        """获取浮点数类型的配置值"""
        try:
            return self.config.getfloat(section, option)
        except Exception as e:
            self.logger.error(f"获取浮点数配置失败 [{section}.{option}]: {e}")
            raise
    
    def get_boolean(self, section, option):
        """获取布尔类型的配置值"""
        try:
            return self.config.getboolean(section, option)
        except Exception as e:
            self.logger.error(f"获取布尔配置失败 [{section}.{option}]: {e}")
            raise

4.3.2 数据收集模块 (data_collector.py)

import requests
import json
import os
from datetime import datetime
from src.config_manager import ConfigManager

class DataCollector:
    """数据收集类"""
    
    def __init__(self, config_manager):
        self.config = config_manager
        self.logger = self.config.logger
        self.api_key = self.config.get('WeatherAPI', 'api_key')
        self.city = self.config.get('WeatherAPI', 'city')
        self.base_url = self.config.get('WeatherAPI', 'base_url')
        
        # 创建数据目录
        if not os.path.exists('data/raw'):
            os.makedirs('data/raw')
    
    def fetch_weather_data(self):
        """从天气API获取数据"""
        try:
            # 构建请求URL
            url = f"{self.base_url}?q={self.city}&appid={self.api_key}&units=metric&lang=zh_cn"
            
            # 发送请求
            self.logger.info(f"正在获取{self.city}的天气数据")
            response = requests.get(url, timeout=10)
            response.raise_for_status()  # 检查HTTP状态码
            
            # 解析响应数据
            data = response.json()
            self.logger.info(f"成功获取{self.city}的天气数据")
            
            # 保存原始数据
            self.save_raw_data(data)
            
            return data
        except requests.exceptions.HTTPError as e:
            self.logger.error(f"HTTP错误: {e}")
            raise
        except requests.exceptions.ConnectionError as e:
            self.logger.error(f"连接错误: {e}")
            raise
        except requests.exceptions.Timeout as e:
            self.logger.error(f"超时错误: {e}")
            raise
        except Exception as e:
            self.logger.error(f"获取天气数据失败: {e}")
            raise
    
    def save_raw_data(self, data):
        """保存原始数据"""
        try:
            # 生成文件名(基于当前时间)
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"data/raw/weather_{self.city}_{timestamp}.json"
            
            # 保存数据
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=4)
            
            self.logger.info(f"原始数据已保存到: {filename}")
        except Exception as e:
            self.logger.error(f"保存原始数据失败: {e}")
            raise

4.3.3 数据处理模块 (data_processor.py)

import json
import os
import pandas as pd
from datetime import datetime
from src.config_manager import ConfigManager

class DataProcessor:
    """数据处理类"""
    
    def __init__(self, config_manager):
        self.config = config_manager
        self.logger = self.config.logger
        
        # 创建数据目录
        if not os.path.exists('data/processed'):
            os.makedirs('data/processed')
    
    def parse_weather_data(self, raw_data):
        """解析天气数据"""
        try:
            self.logger.info("正在解析天气数据")
            
            # 提取基本信息
            city = raw_data['name']
            country = raw_data['sys']['country']
            timestamp = datetime.fromtimestamp(raw_data['dt'])
            
            # 提取当前天气信息
            current_weather = raw_data['weather'][0]
            weather_main = current_weather['main']
            weather_description = current_weather['description']
            
            # 提取温度信息
            main = raw_data['main']
            temp = main['temp']
            temp_min = main['temp_min']
            temp_max = main['temp_max']
            humidity = main['humidity']
            
            # 提取风速信息
            wind_speed = raw_data['wind']['speed']
            
            # 提取云层信息
            clouds = raw_data['clouds']['all']
            
            # 提取降水信息
            rain = raw_data.get('rain', {}).get('1h', 0)
            snow = raw_data.get('snow', {}).get('1h', 0)
            
            # 构建处理后的数据
            processed_data = {
                'city': city,
                'country': country,
                'timestamp': timestamp,
                'date': timestamp.date(),
                'time': timestamp.time(),
                'weather_main': weather_main,
                'weather_description': weather_description,
                'temp': temp,
                'temp_min': temp_min,
                'temp_max': temp_max,
                'humidity': humidity,
                'wind_speed': wind_speed,
                'clouds': clouds,
                'rain_1h': rain,
                'snow_1h': snow
            }
            
            self.logger.info("天气数据解析完成")
            return processed_data
        except KeyError as e:
            self.logger.error(f"解析天气数据失败,缺少字段: {e}")
            raise
        except Exception as e:
            self.logger.error(f"解析天气数据失败: {e}")
            raise
    
    def save_processed_data(self, data):
        """保存处理后的数据"""
        try:
            self.logger.info("正在保存处理后的数据")
            
            # 生成CSV文件名(按日期)
            date_str = data['date'].strftime("%Y%m%d")
            filename = f"data/processed/weather_{data['city']}_{date_str}.csv"
            
            # 转换为DataFrame
            df = pd.DataFrame([data])
            
            # 保存到CSV文件(如果文件已存在,则追加)
            if os.path.exists(filename):
                df.to_csv(filename, mode='a', header=False, index=False, encoding='utf-8')
            else:
                df.to_csv(filename, index=False, encoding='utf-8')
            
            self.logger.info(f"处理后的数据已保存到: {filename}")
            return filename
        except Exception as e:
            self.logger.error(f"保存处理后的数据失败: {e}")
            raise
    
    def load_historical_data(self, city, days=7):
        """加载历史数据"""
        try:
            self.logger.info(f"正在加载{city}最近{days}天的历史数据")
            
            # 获取最近days天的日期
            dates = [(datetime.now() - pd.Timedelta(days=i)).strftime("%Y%m%d") for i in range(days)][::-1]
            
            # 加载所有数据文件
            all_data = []
            for date in dates:
                filename = f"data/processed/weather_{city}_{date}.csv"
                if os.path.exists(filename):
                    df = pd.read_csv(filename)
                    all_data.append(df)
            
            # 合并数据
            if all_data:
                df_combined = pd.concat(all_data, ignore_index=True)
                # 转换timestamp列为datetime类型
                df_combined['timestamp'] = pd.to_datetime(df_combined['timestamp'])
                self.logger.info(f"成功加载{len(df_combined)}条历史数据")
                return df_combined
            else:
                self.logger.warning(f"未找到{city}的历史数据")
                return pd.DataFrame()
        except Exception as e:
            self.logger.error(f"加载历史数据失败: {e}")
            raise
    
    def analyze_data(self, df):
        """分析数据"""
        try:
            if df.empty:
                self.logger.warning("没有数据可分析")
                return None
            
            self.logger.info("正在分析天气数据")
            
            # 计算统计信息
            analysis = {
                'period_start': df['timestamp'].min().strftime("%Y-%m-%d %H:%M"),
                'period_end': df['timestamp'].max().strftime("%Y-%m-%d %H:%M"),
                'total_records': len(df),
                'avg_temp': round(df['temp'].mean(), 2),
                'max_temp': round(df['temp'].max(), 2),
                'min_temp': round(df['temp'].min(), 2),
                'avg_humidity': round(df['humidity'].mean(), 2),
                'avg_wind_speed': round(df['wind_speed'].mean(), 2),
                'most_common_weather': df['weather_main'].mode().iloc[0] if not df['weather_main'].mode().empty else None,
                'total_rain': round(df['rain_1h'].sum(), 2),
                'total_snow': round(df['snow_1h'].sum(), 2)
            }
            
            self.logger.info("数据
« 上一篇 错误处理自动化 下一篇 » GUI编程概念