第166集:系统监控脚本
1. 系统监控脚本概述
系统监控脚本是用于实时监控计算机系统运行状态的自动化工具,它可以收集、分析和报告系统资源使用情况,帮助管理员及时发现和解决系统问题,确保系统稳定运行。
应用场景:
- 监控服务器的CPU、内存、磁盘和网络使用情况
- 跟踪系统进程和服务状态
- 检测系统异常事件和性能瓶颈
- 监控日志文件并发现错误信息
- 实现自动告警机制,及时通知管理员
- 生成系统性能报告和趋势分析
监控指标:
- 系统资源:CPU使用率、内存使用率、磁盘空间、网络流量
- 进程信息:进程数量、CPU/内存占用高的进程、关键进程状态
- 服务状态:Web服务器、数据库服务、应用服务等是否正常运行
- 系统事件:错误日志、登录尝试、文件系统变化
- 性能指标:响应时间、吞吐量、错误率
2. 核心库介绍
Python提供了多个用于系统监控的库,其中最常用的是psutil(Python System and Process Utilities)。
2.1 psutil - 系统和进程工具
psutil是一个跨平台的库,用于获取系统信息和进程详细信息。
安装:
pip install psutil主要功能:
- 系统CPU信息和使用率
- 内存使用情况(物理内存和虚拟内存)
- 磁盘分区和使用率
- 网络接口和流量统计
- 进程列表和详细信息
- 系统启动时间和用户信息
2.2 platform - 系统平台信息
platform是Python的标准库,用于获取系统平台和版本信息。
2.3 logging - 日志记录
logging是Python的标准库,用于记录监控数据和事件。
2.4 time - 时间处理
time是Python的标准库,用于时间戳和时间格式化。
2.5 datetime - 日期时间处理
datetime是Python的标准库,用于更复杂的日期时间操作。
3. 系统信息获取
3.1 获取系统基本信息
import platform
import psutil
import datetime
def get_system_info():
"""获取系统基本信息"""
info = {
"系统名称": platform.system(),
"系统版本": platform.version(),
"系统架构": platform.machine(),
"Python版本": platform.python_version(),
"CPU型号": platform.processor() if platform.system() != "Windows" else platform.machine(),
"物理核心数": psutil.cpu_count(logical=False),
"逻辑核心数": psutil.cpu_count(logical=True),
"总内存": f"{round(psutil.virtual_memory().total / (1024 ** 3), 2)} GB",
"开机时间": datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S")
}
return info
# 打印系统信息
for key, value in get_system_info().items():
print(f"{key}: {value}")3.2 CPU监控
import psutil
import time
def get_cpu_info():
"""获取CPU信息"""
cpu_info = {
"CPU使用率(总体)": f"{psutil.cpu_percent(interval=1)}%",
"CPU使用率(每核)": [f"{percent}%" for percent in psutil.cpu_percent(interval=1, percpu=True)],
"CPU频率": f"{round(psutil.cpu_freq().current / 1000, 2)} GHz"
}
return cpu_info
# 实时监控CPU使用率
print("实时监控CPU使用率(按Ctrl+C停止):")
try:
while True:
cpu_percent = psutil.cpu_percent(interval=1)
print(f"CPU使用率: {cpu_percent}%", end="\r")
time.sleep(1)
except KeyboardInterrupt:
print("\n监控结束")3.3 内存监控
import psutil
def get_memory_info():
"""获取内存信息"""
mem = psutil.virtual_memory()
swap = psutil.swap_memory()
memory_info = {
"总物理内存": f"{round(mem.total / (1024 ** 3), 2)} GB",
"已用物理内存": f"{round(mem.used / (1024 ** 3), 2)} GB",
"可用物理内存": f"{round(mem.available / (1024 ** 3), 2)} GB",
"物理内存使用率": f"{mem.percent}%",
"总虚拟内存": f"{round(swap.total / (1024 ** 3), 2)} GB",
"已用虚拟内存": f"{round(swap.used / (1024 ** 3), 2)} GB",
"虚拟内存使用率": f"{swap.percent}%"
}
return memory_info
# 打印内存信息
for key, value in get_memory_info().items():
print(f"{key}: {value}")3.4 磁盘监控
import psutil
def get_disk_info():
"""获取磁盘信息"""
disk_info = {}
# 获取所有磁盘分区
partitions = psutil.disk_partitions(all=False) # all=False表示只显示物理分区
for partition in partitions:
try:
usage = psutil.disk_usage(partition.mountpoint)
disk_info[partition.device] = {
"挂载点": partition.mountpoint,
"文件系统": partition.fstype,
"总容量": f"{round(usage.total / (1024 ** 3), 2)} GB",
"已用容量": f"{round(usage.used / (1024 ** 3), 2)} GB",
"可用容量": f"{round(usage.free / (1024 ** 3), 2)} GB",
"使用率": f"{usage.percent}%"
}
except PermissionError:
# 某些分区可能没有权限访问
continue
return disk_info
# 打印磁盘信息
disk_info = get_disk_info()
for device, info in disk_info.items():
print(f"\n磁盘: {device}")
for key, value in info.items():
print(f" {key}: {value}")3.5 网络监控
import psutil
def get_network_info():
"""获取网络信息"""
# 获取网络接口信息
net_interfaces = psutil.net_if_addrs()
# 获取网络统计信息
net_stats = psutil.net_io_counters(pernic=True) # pernic=True表示按接口统计
network_info = {}
for interface, addresses in net_interfaces.items():
# 获取IP地址
ip_addresses = []
for addr in addresses:
if addr.family.name == 'AF_INET': # IPv4地址
ip_addresses.append(addr.address)
elif addr.family.name == 'AF_INET6': # IPv6地址
ip_addresses.append(f"IPv6: {addr.address}")
# 获取网络流量
if interface in net_stats:
stats = net_stats[interface]
traffic_info = {
"发送字节数": f"{round(stats.bytes_sent / (1024 ** 2), 2)} MB",
"接收字节数": f"{round(stats.bytes_recv / (1024 ** 2), 2)} MB",
"发送数据包数": stats.packets_sent,
"接收数据包数": stats.packets_recv,
"发送错误数": stats.errin,
"接收错误数": stats.errout,
"丢弃数据包数": stats.dropin + stats.dropout
}
else:
traffic_info = {"说明": "无流量统计数据"}
network_info[interface] = {
"IP地址": ip_addresses,
"流量统计": traffic_info
}
return network_info
# 打印网络信息
network_info = get_network_info()
for interface, info in network_info.items():
print(f"\n网络接口: {interface}")
print(f" IP地址: {', '.join(info['IP地址'])}")
print(" 流量统计:")
for key, value in info['流量统计'].items():
print(f" {key}: {value}")4. 进程监控
4.1 进程列表和信息
import psutil
def get_process_info():
"""获取进程信息"""
processes = []
# 获取所有进程
for proc in psutil.process_iter(['pid', 'name', 'username', 'cpu_percent', 'memory_percent']):
try:
# 获取进程详细信息
process_info = proc.info
processes.append(process_info)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
# 忽略无法访问的进程
continue
# 按CPU使用率排序(降序)
processes.sort(key=lambda x: x['cpu_percent'] if x['cpu_percent'] is not None else 0, reverse=True)
return processes[:10] # 返回CPU使用率最高的10个进程
# 打印进程信息
print("CPU使用率最高的10个进程:")
print("PID\t名称\t\t\tCPU%\t内存%\t用户名")
print("-" * 80)
for proc in get_process_info():
pid = proc['pid']
name = proc['name']
cpu = proc['cpu_percent'] or 0.0
memory = proc['memory_percent'] or 0.0
username = proc['username'] or "未知"
# 调整输出格式
name_display = name[:20].ljust(20)
print(f"{pid}\t{name_display}\t{cpu:.1f}%\t{memory:.1f}%\t{username}")4.2 监控特定进程
import psutil
import time
def monitor_process(process_name):
"""
监控特定进程
:param process_name: 进程名称
"""
print(f"监控进程 '{process_name}'(按Ctrl+C停止):")
try:
while True:
found = False
for proc in psutil.process_iter(['name', 'pid', 'cpu_percent', 'memory_percent']):
try:
if process_name.lower() in proc.info['name'].lower():
found = True
pid = proc.info['pid']
cpu = proc.info['cpu_percent'] or 0.0
memory = proc.info['memory_percent'] or 0.0
print(f"进程 '{proc.info['name']}' (PID: {pid}) - CPU: {cpu:.1f}%, 内存: {memory:.1f}%", end="\r")
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
if not found:
print(f"进程 '{process_name}' 未运行", end="\r")
time.sleep(1)
except KeyboardInterrupt:
print("\n监控结束")
# 监控特定进程
monitor_process("chrome") # 监控Chrome浏览器进程5. 日志文件监控
5.1 实时监控日志文件
import time
import re
def monitor_log_file(log_file_path, pattern=None):
"""
实时监控日志文件
:param log_file_path: 日志文件路径
:param pattern: 正则表达式模式,用于过滤特定内容
"""
print(f"实时监控日志文件: {log_file_path}(按Ctrl+C停止)")
try:
with open(log_file_path, 'r') as file:
# 移动到文件末尾
file.seek(0, 2)
while True:
line = file.readline()
if not line:
# 没有新内容,等待
time.sleep(0.5)
continue
# 如果指定了模式,只显示匹配的行
if pattern:
if re.search(pattern, line):
print(line.strip())
else:
print(line.strip())
except FileNotFoundError:
print(f"错误: 日志文件 '{log_file_path}' 不存在")
except KeyboardInterrupt:
print("\n监控结束")
except Exception as e:
print(f"错误: {e}")
# 监控日志文件示例
# monitor_log_file("/var/log/syslog", pattern="error|warning|critical")
monitor_log_file("test.log") # 监控当前目录下的test.log文件5.2 分析日志文件
import re
from collections import Counter
def analyze_log_file(log_file_path, pattern):
"""
分析日志文件,统计匹配内容
:param log_file_path: 日志文件路径
:param pattern: 正则表达式模式
:return: 匹配结果统计
"""
counter = Counter()
try:
with open(log_file_path, 'r') as file:
for line in file:
match = re.search(pattern, line)
if match:
# 提取匹配的内容
matched_content = match.group()
counter[matched_content] += 1
return counter
except FileNotFoundError:
print(f"错误: 日志文件 '{log_file_path}' 不存在")
return Counter()
except Exception as e:
print(f"错误: {e}")
return Counter()
# 分析日志文件示例
# log_analysis = analyze_log_file("/var/log/syslog", r"(error|warning|critical)")
# print("日志错误级别统计:")
# for level, count in log_analysis.items():
# print(f" {level}: {count}次")6. 监控告警机制
6.1 邮件告警
结合第165集的邮件自动化内容,我们可以实现监控告警功能。
import psutil
import smtplib
from email.mime.text import MIMEText
from email.header import Header
def send_alert_email(subject, message):
"""
发送告警邮件
:param subject: 邮件主题
:param message: 邮件内容
:return: 是否发送成功
"""
try:
# 邮件服务器配置
mail_host = "smtp.example.com"
mail_user = "your_email@example.com"
mail_pass = "your_password"
sender = "your_email@example.com"
receivers = ["admin@example.com"]
# 创建邮件对象
msg = MIMEText(message, 'plain', 'utf-8')
msg['From'] = Header("系统监控脚本", 'utf-8')
msg['To'] = Header("系统管理员", 'utf-8')
msg['Subject'] = Header(subject, 'utf-8')
# 发送邮件
with smtplib.SMTP_SSL(mail_host, 465) as smtp:
smtp.login(mail_user, mail_pass)
smtp.sendmail(sender, receivers, msg.as_string())
print(f"✓ 告警邮件发送成功:{subject}")
return True
except Exception as e:
print(f"✗ 告警邮件发送失败:{e}")
return False
def check_system_health():
"""
检查系统健康状态
"""
alerts = []
# 检查CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent > 80:
alerts.append(f"CPU使用率过高:{cpu_percent}%")
# 检查内存使用率
mem_percent = psutil.virtual_memory().percent
if mem_percent > 80:
alerts.append(f"内存使用率过高:{mem_percent}%")
# 检查磁盘空间
for partition in psutil.disk_partitions(all=False):
try:
usage = psutil.disk_usage(partition.mountpoint)
if usage.percent > 85:
alerts.append(f"磁盘空间不足 ({partition.mountpoint}):{usage.percent}%")
except PermissionError:
continue
# 检查关键进程
critical_processes = ["nginx", "mysql", "ssh"]
for proc_name in critical_processes:
found = False
for proc in psutil.process_iter(['name']):
try:
if proc_name in proc.info['name'].lower():
found = True
break
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
if not found:
alerts.append(f"关键进程未运行:{proc_name}")
return alerts
# 检查系统健康状态并发送告警
alerts = check_system_health()
if alerts:
alert_message = "发现以下系统异常:\n\n" + "\n".join([f"- {alert}" for alert in alerts])
send_alert_email("系统健康告警", alert_message)
else:
print("系统健康状态正常")7. 综合监控系统
7.1 创建完整的系统监控脚本
import psutil
import time
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
filename='system_monitor.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def get_system_stats():
"""获取系统统计信息"""
cpu_percent = psutil.cpu_percent(interval=1)
mem_percent = psutil.virtual_memory().percent
disk_percent = psutil.disk_usage('/').percent
# 获取网络流量
net_io = psutil.net_io_counters()
bytes_sent = round(net_io.bytes_sent / (1024 ** 2), 2)
bytes_recv = round(net_io.bytes_recv / (1024 ** 2), 2)
# 获取进程数
process_count = len(list(psutil.process_iter()))
stats = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"cpu_percent": cpu_percent,
"mem_percent": mem_percent,
"disk_percent": disk_percent,
"bytes_sent": bytes_sent,
"bytes_recv": bytes_recv,
"process_count": process_count
}
return stats
def log_system_stats(stats):
"""记录系统统计信息到日志"""
log_message = (f"CPU: {stats['cpu_percent']}%, "
f"内存: {stats['mem_percent']}%, "
f"磁盘: {stats['disk_percent']}%, "
f"发送: {stats['bytes_sent']} MB, "
f"接收: {stats['bytes_recv']} MB, "
f"进程数: {stats['process_count']}")
logging.info(log_message)
print(f"{stats['timestamp']} - {log_message}")
def check_thresholds(stats):
"""检查阈值并发送告警"""
thresholds = {
"cpu_percent": 80,
"mem_percent": 80,
"disk_percent": 85
}
alerts = []
if stats['cpu_percent'] > thresholds['cpu_percent']:
alerts.append(f"CPU使用率过高: {stats['cpu_percent']}%")
if stats['mem_percent'] > thresholds['mem_percent']:
alerts.append(f"内存使用率过高: {stats['mem_percent']}%")
if stats['disk_percent'] > thresholds['disk_percent']:
alerts.append(f"磁盘使用率过高: {stats['disk_percent']}%")
if alerts:
alert_message = "系统监控告警:\n\n" + "\n".join([f"- {alert}" for alert in alerts])
logging.warning(alert_message)
print(f"⚠️ 告警: {alert_message}")
# 这里可以添加发送邮件告警的代码
def main():
"""主函数"""
print("系统监控脚本启动(按Ctrl+C停止)")
logging.info("系统监控脚本启动")
try:
while True:
stats = get_system_stats()
log_system_stats(stats)
check_thresholds(stats)
time.sleep(60) # 每分钟检查一次
except KeyboardInterrupt:
print("\n系统监控脚本停止")
logging.info("系统监控脚本停止")
except Exception as e:
logging.error(f"脚本执行错误: {e}")
print(f"错误: {e}")
if __name__ == "__main__":
main()8. 系统监控脚本最佳实践
8.1 性能考虑
- 合理设置监控间隔:监控间隔过短会增加系统开销,过长则可能错过重要事件
- 选择性监控:只监控必要的指标,避免收集过多冗余数据
- 资源限制:设置脚本的CPU和内存使用限制
- 批量处理:对大量数据进行批量处理,减少I/O操作
8.2 可靠性设计
- 错误处理:对所有可能的错误进行捕获和处理
- 日志记录:详细记录脚本运行状态和错误信息
- 自动恢复:实现脚本的自动重启机制
- 冗余设计:关键监控指标使用多种方式验证
8.3 安全性考虑
- 权限控制:脚本以最小必要权限运行
- 数据加密:监控数据和告警信息进行加密传输
- 访问控制:限制监控界面和数据的访问权限
- 日志安全:保护监控日志,防止篡改和泄露
8.4 可维护性
- 模块化设计:将不同功能封装为函数或类
- 配置文件:将参数和阈值放在配置文件中,便于修改
- 文档注释:详细的代码注释和使用文档
- 版本控制:使用Git等工具进行版本管理
8.5 扩展性
- 插件架构:支持通过插件扩展监控功能
- 数据接口:提供API接口,方便与其他系统集成
- 可视化支持:支持将监控数据导出为图表或报表
- 告警渠道:支持多种告警方式(邮件、短信、微信等)
9. 总结
系统监控脚本是维护系统稳定运行的重要工具,通过Python的psutil库,我们可以轻松实现对系统资源、进程、网络和日志的全面监控。结合告警机制,可以及时发现和解决系统问题,确保系统的可靠性和性能。
关键要点:
- 使用
psutil库获取系统和进程信息 - 监控CPU、内存、磁盘、网络等关键指标
- 跟踪进程状态和资源使用情况
- 实时监控日志文件并发现异常
- 实现阈值检查和自动告警机制
- 遵循最佳实践,确保监控脚本的性能和可靠性
通过不断练习和实践,你可以根据实际需求定制更复杂的系统监控脚本,为系统的稳定运行提供保障。