第166集:系统监控脚本

1. 系统监控脚本概述

系统监控脚本是用于实时监控计算机系统运行状态的自动化工具,它可以收集、分析和报告系统资源使用情况,帮助管理员及时发现和解决系统问题,确保系统稳定运行。

应用场景:

  • 监控服务器的CPU、内存、磁盘和网络使用情况
  • 跟踪系统进程和服务状态
  • 检测系统异常事件和性能瓶颈
  • 监控日志文件并发现错误信息
  • 实现自动告警机制,及时通知管理员
  • 生成系统性能报告和趋势分析

监控指标:

  • 系统资源:CPU使用率、内存使用率、磁盘空间、网络流量
  • 进程信息:进程数量、CPU/内存占用高的进程、关键进程状态
  • 服务状态:Web服务器、数据库服务、应用服务等是否正常运行
  • 系统事件:错误日志、登录尝试、文件系统变化
  • 性能指标:响应时间、吞吐量、错误率

2. 核心库介绍

Python提供了多个用于系统监控的库,其中最常用的是psutil(Python System and Process Utilities)。

2.1 psutil - 系统和进程工具

psutil是一个跨平台的库,用于获取系统信息和进程详细信息。

安装:

pip install psutil

主要功能:

  • 系统CPU信息和使用率
  • 内存使用情况(物理内存和虚拟内存)
  • 磁盘分区和使用率
  • 网络接口和流量统计
  • 进程列表和详细信息
  • 系统启动时间和用户信息

2.2 platform - 系统平台信息

platform是Python的标准库,用于获取系统平台和版本信息。

2.3 logging - 日志记录

logging是Python的标准库,用于记录监控数据和事件。

2.4 time - 时间处理

time是Python的标准库,用于时间戳和时间格式化。

2.5 datetime - 日期时间处理

datetime是Python的标准库,用于更复杂的日期时间操作。

3. 系统信息获取

3.1 获取系统基本信息

import platform
import psutil
import datetime

def get_system_info():
    """获取系统基本信息"""
    info = {
        "系统名称": platform.system(),
        "系统版本": platform.version(),
        "系统架构": platform.machine(),
        "Python版本": platform.python_version(),
        "CPU型号": platform.processor() if platform.system() != "Windows" else platform.machine(),
        "物理核心数": psutil.cpu_count(logical=False),
        "逻辑核心数": psutil.cpu_count(logical=True),
        "总内存": f"{round(psutil.virtual_memory().total / (1024 ** 3), 2)} GB",
        "开机时间": datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S")
    }
    
    return info

# 打印系统信息
for key, value in get_system_info().items():
    print(f"{key}: {value}")

3.2 CPU监控

import psutil
import time

def get_cpu_info():
    """获取CPU信息"""
    cpu_info = {
        "CPU使用率(总体)": f"{psutil.cpu_percent(interval=1)}%",
        "CPU使用率(每核)": [f"{percent}%" for percent in psutil.cpu_percent(interval=1, percpu=True)],
        "CPU频率": f"{round(psutil.cpu_freq().current / 1000, 2)} GHz"
    }
    
    return cpu_info

# 实时监控CPU使用率
print("实时监控CPU使用率(按Ctrl+C停止):")
try:
    while True:
        cpu_percent = psutil.cpu_percent(interval=1)
        print(f"CPU使用率: {cpu_percent}%", end="\r")
        time.sleep(1)
except KeyboardInterrupt:
    print("\n监控结束")

3.3 内存监控

import psutil

def get_memory_info():
    """获取内存信息"""
    mem = psutil.virtual_memory()
    swap = psutil.swap_memory()
    
    memory_info = {
        "总物理内存": f"{round(mem.total / (1024 ** 3), 2)} GB",
        "已用物理内存": f"{round(mem.used / (1024 ** 3), 2)} GB",
        "可用物理内存": f"{round(mem.available / (1024 ** 3), 2)} GB",
        "物理内存使用率": f"{mem.percent}%",
        "总虚拟内存": f"{round(swap.total / (1024 ** 3), 2)} GB",
        "已用虚拟内存": f"{round(swap.used / (1024 ** 3), 2)} GB",
        "虚拟内存使用率": f"{swap.percent}%"
    }
    
    return memory_info

# 打印内存信息
for key, value in get_memory_info().items():
    print(f"{key}: {value}")

3.4 磁盘监控

import psutil

def get_disk_info():
    """获取磁盘信息"""
    disk_info = {}
    
    # 获取所有磁盘分区
    partitions = psutil.disk_partitions(all=False)  # all=False表示只显示物理分区
    
    for partition in partitions:
        try:
            usage = psutil.disk_usage(partition.mountpoint)
            disk_info[partition.device] = {
                "挂载点": partition.mountpoint,
                "文件系统": partition.fstype,
                "总容量": f"{round(usage.total / (1024 ** 3), 2)} GB",
                "已用容量": f"{round(usage.used / (1024 ** 3), 2)} GB",
                "可用容量": f"{round(usage.free / (1024 ** 3), 2)} GB",
                "使用率": f"{usage.percent}%"
            }
        except PermissionError:
            # 某些分区可能没有权限访问
            continue
    
    return disk_info

# 打印磁盘信息
disk_info = get_disk_info()
for device, info in disk_info.items():
    print(f"\n磁盘: {device}")
    for key, value in info.items():
        print(f"  {key}: {value}")

3.5 网络监控

import psutil

def get_network_info():
    """获取网络信息"""
    # 获取网络接口信息
    net_interfaces = psutil.net_if_addrs()
    # 获取网络统计信息
    net_stats = psutil.net_io_counters(pernic=True)  # pernic=True表示按接口统计
    
    network_info = {}
    
    for interface, addresses in net_interfaces.items():
        # 获取IP地址
        ip_addresses = []
        for addr in addresses:
            if addr.family.name == 'AF_INET':  # IPv4地址
                ip_addresses.append(addr.address)
            elif addr.family.name == 'AF_INET6':  # IPv6地址
                ip_addresses.append(f"IPv6: {addr.address}")
        
        # 获取网络流量
        if interface in net_stats:
            stats = net_stats[interface]
            traffic_info = {
                "发送字节数": f"{round(stats.bytes_sent / (1024 ** 2), 2)} MB",
                "接收字节数": f"{round(stats.bytes_recv / (1024 ** 2), 2)} MB",
                "发送数据包数": stats.packets_sent,
                "接收数据包数": stats.packets_recv,
                "发送错误数": stats.errin,
                "接收错误数": stats.errout,
                "丢弃数据包数": stats.dropin + stats.dropout
            }
        else:
            traffic_info = {"说明": "无流量统计数据"}
        
        network_info[interface] = {
            "IP地址": ip_addresses,
            "流量统计": traffic_info
        }
    
    return network_info

# 打印网络信息
network_info = get_network_info()
for interface, info in network_info.items():
    print(f"\n网络接口: {interface}")
    print(f"  IP地址: {', '.join(info['IP地址'])}")
    print("  流量统计:")
    for key, value in info['流量统计'].items():
        print(f"    {key}: {value}")

4. 进程监控

4.1 进程列表和信息

import psutil

def get_process_info():
    """获取进程信息"""
    processes = []
    
    # 获取所有进程
    for proc in psutil.process_iter(['pid', 'name', 'username', 'cpu_percent', 'memory_percent']):
        try:
            # 获取进程详细信息
            process_info = proc.info
            processes.append(process_info)
        except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
            # 忽略无法访问的进程
            continue
    
    # 按CPU使用率排序(降序)
    processes.sort(key=lambda x: x['cpu_percent'] if x['cpu_percent'] is not None else 0, reverse=True)
    
    return processes[:10]  # 返回CPU使用率最高的10个进程

# 打印进程信息
print("CPU使用率最高的10个进程:")
print("PID\t名称\t\t\tCPU%\t内存%\t用户名")
print("-" * 80)

for proc in get_process_info():
    pid = proc['pid']
    name = proc['name']
    cpu = proc['cpu_percent'] or 0.0
    memory = proc['memory_percent'] or 0.0
    username = proc['username'] or "未知"
    
    # 调整输出格式
    name_display = name[:20].ljust(20)
    print(f"{pid}\t{name_display}\t{cpu:.1f}%\t{memory:.1f}%\t{username}")

4.2 监控特定进程

import psutil
import time

def monitor_process(process_name):
    """
    监控特定进程
    :param process_name: 进程名称
    """
    print(f"监控进程 '{process_name}'(按Ctrl+C停止):")
    
    try:
        while True:
            found = False
            for proc in psutil.process_iter(['name', 'pid', 'cpu_percent', 'memory_percent']):
                try:
                    if process_name.lower() in proc.info['name'].lower():
                        found = True
                        pid = proc.info['pid']
                        cpu = proc.info['cpu_percent'] or 0.0
                        memory = proc.info['memory_percent'] or 0.0
                        print(f"进程 '{proc.info['name']}' (PID: {pid}) - CPU: {cpu:.1f}%, 内存: {memory:.1f}%", end="\r")
                except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
                    continue
            
            if not found:
                print(f"进程 '{process_name}' 未运行", end="\r")
            
            time.sleep(1)
    except KeyboardInterrupt:
        print("\n监控结束")

# 监控特定进程
monitor_process("chrome")  # 监控Chrome浏览器进程

5. 日志文件监控

5.1 实时监控日志文件

import time
import re

def monitor_log_file(log_file_path, pattern=None):
    """
    实时监控日志文件
    :param log_file_path: 日志文件路径
    :param pattern: 正则表达式模式,用于过滤特定内容
    """
    print(f"实时监控日志文件: {log_file_path}(按Ctrl+C停止)")
    
    try:
        with open(log_file_path, 'r') as file:
            # 移动到文件末尾
            file.seek(0, 2)
            
            while True:
                line = file.readline()
                if not line:
                    # 没有新内容,等待
                    time.sleep(0.5)
                    continue
                
                # 如果指定了模式,只显示匹配的行
                if pattern:
                    if re.search(pattern, line):
                        print(line.strip())
                else:
                    print(line.strip())
    except FileNotFoundError:
        print(f"错误: 日志文件 '{log_file_path}' 不存在")
    except KeyboardInterrupt:
        print("\n监控结束")
    except Exception as e:
        print(f"错误: {e}")

# 监控日志文件示例
# monitor_log_file("/var/log/syslog", pattern="error|warning|critical")
monitor_log_file("test.log")  # 监控当前目录下的test.log文件

5.2 分析日志文件

import re
from collections import Counter

def analyze_log_file(log_file_path, pattern):
    """
    分析日志文件,统计匹配内容
    :param log_file_path: 日志文件路径
    :param pattern: 正则表达式模式
    :return: 匹配结果统计
    """
    counter = Counter()
    
    try:
        with open(log_file_path, 'r') as file:
            for line in file:
                match = re.search(pattern, line)
                if match:
                    # 提取匹配的内容
                    matched_content = match.group()
                    counter[matched_content] += 1
        
        return counter
    except FileNotFoundError:
        print(f"错误: 日志文件 '{log_file_path}' 不存在")
        return Counter()
    except Exception as e:
        print(f"错误: {e}")
        return Counter()

# 分析日志文件示例
# log_analysis = analyze_log_file("/var/log/syslog", r"(error|warning|critical)")
# print("日志错误级别统计:")
# for level, count in log_analysis.items():
#     print(f"  {level}: {count}次")

6. 监控告警机制

6.1 邮件告警

结合第165集的邮件自动化内容,我们可以实现监控告警功能。

import psutil
import smtplib
from email.mime.text import MIMEText
from email.header import Header

def send_alert_email(subject, message):
    """
    发送告警邮件
    :param subject: 邮件主题
    :param message: 邮件内容
    :return: 是否发送成功
    """
    try:
        # 邮件服务器配置
        mail_host = "smtp.example.com"
        mail_user = "your_email@example.com"
        mail_pass = "your_password"
        sender = "your_email@example.com"
        receivers = ["admin@example.com"]
        
        # 创建邮件对象
        msg = MIMEText(message, 'plain', 'utf-8')
        msg['From'] = Header("系统监控脚本", 'utf-8')
        msg['To'] = Header("系统管理员", 'utf-8')
        msg['Subject'] = Header(subject, 'utf-8')
        
        # 发送邮件
        with smtplib.SMTP_SSL(mail_host, 465) as smtp:
            smtp.login(mail_user, mail_pass)
            smtp.sendmail(sender, receivers, msg.as_string())
        
        print(f"✓ 告警邮件发送成功:{subject}")
        return True
    except Exception as e:
        print(f"✗ 告警邮件发送失败:{e}")
        return False

def check_system_health():
    """
    检查系统健康状态
    """
    alerts = []
    
    # 检查CPU使用率
    cpu_percent = psutil.cpu_percent(interval=1)
    if cpu_percent > 80:
        alerts.append(f"CPU使用率过高:{cpu_percent}%")
    
    # 检查内存使用率
    mem_percent = psutil.virtual_memory().percent
    if mem_percent > 80:
        alerts.append(f"内存使用率过高:{mem_percent}%")
    
    # 检查磁盘空间
    for partition in psutil.disk_partitions(all=False):
        try:
            usage = psutil.disk_usage(partition.mountpoint)
            if usage.percent > 85:
                alerts.append(f"磁盘空间不足 ({partition.mountpoint}):{usage.percent}%")
        except PermissionError:
            continue
    
    # 检查关键进程
    critical_processes = ["nginx", "mysql", "ssh"]
    for proc_name in critical_processes:
        found = False
        for proc in psutil.process_iter(['name']):
            try:
                if proc_name in proc.info['name'].lower():
                    found = True
                    break
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue
        
        if not found:
            alerts.append(f"关键进程未运行:{proc_name}")
    
    return alerts

# 检查系统健康状态并发送告警
alerts = check_system_health()
if alerts:
    alert_message = "发现以下系统异常:\n\n" + "\n".join([f"- {alert}" for alert in alerts])
    send_alert_email("系统健康告警", alert_message)
else:
    print("系统健康状态正常")

7. 综合监控系统

7.1 创建完整的系统监控脚本

import psutil
import time
import logging
from datetime import datetime

# 配置日志
logging.basicConfig(
    filename='system_monitor.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def get_system_stats():
    """获取系统统计信息"""
    cpu_percent = psutil.cpu_percent(interval=1)
    mem_percent = psutil.virtual_memory().percent
    disk_percent = psutil.disk_usage('/').percent
    
    # 获取网络流量
    net_io = psutil.net_io_counters()
    bytes_sent = round(net_io.bytes_sent / (1024 ** 2), 2)
    bytes_recv = round(net_io.bytes_recv / (1024 ** 2), 2)
    
    # 获取进程数
    process_count = len(list(psutil.process_iter()))
    
    stats = {
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "cpu_percent": cpu_percent,
        "mem_percent": mem_percent,
        "disk_percent": disk_percent,
        "bytes_sent": bytes_sent,
        "bytes_recv": bytes_recv,
        "process_count": process_count
    }
    
    return stats

def log_system_stats(stats):
    """记录系统统计信息到日志"""
    log_message = (f"CPU: {stats['cpu_percent']}%, "
                   f"内存: {stats['mem_percent']}%, "
                   f"磁盘: {stats['disk_percent']}%, "
                   f"发送: {stats['bytes_sent']} MB, "
                   f"接收: {stats['bytes_recv']} MB, "
                   f"进程数: {stats['process_count']}")
    
    logging.info(log_message)
    print(f"{stats['timestamp']} - {log_message}")

def check_thresholds(stats):
    """检查阈值并发送告警"""
    thresholds = {
        "cpu_percent": 80,
        "mem_percent": 80,
        "disk_percent": 85
    }
    
    alerts = []
    
    if stats['cpu_percent'] > thresholds['cpu_percent']:
        alerts.append(f"CPU使用率过高: {stats['cpu_percent']}%")
    
    if stats['mem_percent'] > thresholds['mem_percent']:
        alerts.append(f"内存使用率过高: {stats['mem_percent']}%")
    
    if stats['disk_percent'] > thresholds['disk_percent']:
        alerts.append(f"磁盘使用率过高: {stats['disk_percent']}%")
    
    if alerts:
        alert_message = "系统监控告警:\n\n" + "\n".join([f"- {alert}" for alert in alerts])
        logging.warning(alert_message)
        print(f"⚠️  告警: {alert_message}")
        # 这里可以添加发送邮件告警的代码

def main():
    """主函数"""
    print("系统监控脚本启动(按Ctrl+C停止)")
    logging.info("系统监控脚本启动")
    
    try:
        while True:
            stats = get_system_stats()
            log_system_stats(stats)
            check_thresholds(stats)
            time.sleep(60)  # 每分钟检查一次
    except KeyboardInterrupt:
        print("\n系统监控脚本停止")
        logging.info("系统监控脚本停止")
    except Exception as e:
        logging.error(f"脚本执行错误: {e}")
        print(f"错误: {e}")

if __name__ == "__main__":
    main()

8. 系统监控脚本最佳实践

8.1 性能考虑

  • 合理设置监控间隔:监控间隔过短会增加系统开销,过长则可能错过重要事件
  • 选择性监控:只监控必要的指标,避免收集过多冗余数据
  • 资源限制:设置脚本的CPU和内存使用限制
  • 批量处理:对大量数据进行批量处理,减少I/O操作

8.2 可靠性设计

  • 错误处理:对所有可能的错误进行捕获和处理
  • 日志记录:详细记录脚本运行状态和错误信息
  • 自动恢复:实现脚本的自动重启机制
  • 冗余设计:关键监控指标使用多种方式验证

8.3 安全性考虑

  • 权限控制:脚本以最小必要权限运行
  • 数据加密:监控数据和告警信息进行加密传输
  • 访问控制:限制监控界面和数据的访问权限
  • 日志安全:保护监控日志,防止篡改和泄露

8.4 可维护性

  • 模块化设计:将不同功能封装为函数或类
  • 配置文件:将参数和阈值放在配置文件中,便于修改
  • 文档注释:详细的代码注释和使用文档
  • 版本控制:使用Git等工具进行版本管理

8.5 扩展性

  • 插件架构:支持通过插件扩展监控功能
  • 数据接口:提供API接口,方便与其他系统集成
  • 可视化支持:支持将监控数据导出为图表或报表
  • 告警渠道:支持多种告警方式(邮件、短信、微信等)

9. 总结

系统监控脚本是维护系统稳定运行的重要工具,通过Python的psutil库,我们可以轻松实现对系统资源、进程、网络和日志的全面监控。结合告警机制,可以及时发现和解决系统问题,确保系统的可靠性和性能。

关键要点:

  • 使用psutil库获取系统和进程信息
  • 监控CPU、内存、磁盘、网络等关键指标
  • 跟踪进程状态和资源使用情况
  • 实时监控日志文件并发现异常
  • 实现阈值检查和自动告警机制
  • 遵循最佳实践,确保监控脚本的性能和可靠性

通过不断练习和实践,你可以根据实际需求定制更复杂的系统监控脚本,为系统的稳定运行提供保障。

« 上一篇 邮件自动化 下一篇 » 定时任务