第167集:定时任务
1. 定时任务概述
定时任务是指按照预定的时间间隔或特定时间自动执行的程序或脚本。在Python中,我们可以通过多种方式实现定时任务,从简单的循环等待到复杂的调度系统。
应用场景:
- 定期备份数据库或文件
- 定时发送邮件或通知
- 定期采集网站数据或API数据
- 定时生成报表或统计数据
- 定时清理临时文件或日志
- 定时执行系统维护任务
- 定时监控系统状态并发送告警
定时任务类型:
- 间隔执行:每隔一定时间执行一次(如每5分钟)
- 定时执行:在特定时间执行(如每天上午9点)
- 周期性执行:按特定周期执行(如每周一执行)
- 延迟执行:延迟一段时间后执行(如延迟10秒后执行)
实现方式比较:
| 实现方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| time.sleep() | 简单易用,无需额外依赖 | 精度低,不能处理复杂调度 | 简单的间隔执行任务 |
| sched模块 | 标准库,支持优先级和延迟 | API相对复杂,不支持周期性任务 | 对精度要求较高的简单任务 |
| schedule库 | 语法简洁,支持复杂调度 | 依赖外部库,精度一般 | 大多数定时任务场景 |
| APScheduler | 功能强大,支持多种调度器 | 配置复杂,学习成本高 | 复杂的企业级应用 |
| 系统定时任务 | 稳定性高,不依赖Python | 配置复杂,跨平台差异大 | 长期稳定运行的任务 |
2. 核心库介绍
2.1 time - 时间处理库
time是Python的标准库,提供了时间处理的基本功能,包括sleep函数用于实现简单的定时任务。
主要功能:
time.sleep(seconds):暂停程序执行指定的秒数time.time():获取当前时间戳time.localtime():获取本地时间time.strftime():格式化时间字符串
2.2 sched - 事件调度器
sched是Python的标准库,提供了一个通用的事件调度器,可以按时间或延迟执行任务。
主要功能:
- 创建调度器:
sched.scheduler(timefunc, delayfunc) - 调度事件:
scheduler.enter(delay, priority, action, argument=(), kwargs={}) - 运行调度器:
scheduler.run()
2.3 schedule - 简单调度库
schedule是一个第三方库,提供了简洁易用的API用于创建定时任务。
安装:
pip install schedule主要功能:
- 间隔执行:
schedule.every(interval).seconds/minutes/hours/days.do(job) - 定时执行:
schedule.every().day.at("10:30").do(job) - 周期性执行:
schedule.every().monday.do(job) - 取消任务:
schedule.cancel_job(job)
2.4 APScheduler - 高级调度库
APScheduler是一个功能强大的第三方库,支持多种调度器和触发器。
安装:
pip install apscheduler主要组件:
- **触发器(Triggers)**:确定任务何时执行(如间隔、定时、Cron表达式)
- **作业存储(Jobs Stores)**:保存任务信息(如内存、数据库)
- **执行器(Executors)**:执行任务(如线程池、进程池)
- **调度器(Schedulers)**:协调组件工作(如BackgroundScheduler、BlockingScheduler)
3. 简单定时任务实现
3.1 使用time.sleep()实现间隔执行
import time
import datetime
def job():
"""定时执行的任务"""
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"任务执行时间: {current_time}")
print("执行一些定时任务...")
# 主程序
if __name__ == "__main__":
print("定时任务启动(按Ctrl+C停止)")
try:
while True:
job() # 执行任务
time.sleep(5) # 间隔5秒
except KeyboardInterrupt:
print("\n定时任务已停止")3.2 使用sched模块实现定时任务
import sched
import time
import datetime
# 创建调度器
scheduler = sched.scheduler(time.time, time.sleep)
def job():
"""定时执行的任务"""
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"任务执行时间: {current_time}")
print("执行一些定时任务...")
# 再次调度任务(5秒后执行)
scheduler.enter(5, 1, job)
# 主程序
if __name__ == "__main__":
print("定时任务启动(按Ctrl+C停止)")
# 第一次调度任务(立即执行)
scheduler.enter(0, 1, job)
try:
scheduler.run() # 运行调度器
except KeyboardInterrupt:
print("\n定时任务已停止")4. 使用schedule库实现定时任务
4.1 间隔执行任务
import schedule
import time
import datetime
def job():
"""定时执行的任务"""
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"任务执行时间: {current_time}")
print("执行一些定时任务...")
# 主程序
if __name__ == "__main__":
print("定时任务启动(按Ctrl+C停止)")
# 每5秒执行一次任务
schedule.every(5).seconds.do(job)
# 每2分钟执行一次任务
# schedule.every(2).minutes.do(job)
# 每1小时执行一次任务
# schedule.every().hour.do(job)
try:
while True:
schedule.run_pending() # 运行所有等待执行的任务
time.sleep(1) # 避免CPU占用过高
except KeyboardInterrupt:
print("\n定时任务已停止")4.2 定时执行任务
import schedule
import time
import datetime
def job():
"""定时执行的任务"""
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"任务执行时间: {current_time}")
print("执行一些定时任务...")
# 主程序
if __name__ == "__main__":
print("定时任务启动(按Ctrl+C停止)")
# 每天10:30执行任务
schedule.every().day.at("10:30").do(job)
# 每天的特定时间执行任务
# schedule.every().day.at("14:45").do(job)
# schedule.every().day.at("18:00").do(job)
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("\n定时任务已停止")4.3 周期性执行任务
import schedule
import time
import datetime
def job():
"""定时执行的任务"""
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"任务执行时间: {current_time}")
print("执行一些定时任务...")
# 主程序
if __name__ == "__main__":
print("定时任务启动(按Ctrl+C停止)")
# 每周一执行任务
schedule.every().monday.do(job)
# 每周一的特定时间执行任务
schedule.every().monday.at("09:00").do(job)
# 每周三执行任务
# schedule.every().wednesday.do(job)
# 每周五的特定时间执行任务
# schedule.every().friday.at("17:00").do(job)
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("\n定时任务已停止")4.4 任务参数传递
import schedule
import time
import datetime
def job_with_params(name, message):
"""带参数的定时任务"""
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"任务执行时间: {current_time}")
print(f"任务名称: {name}")
print(f"任务消息: {message}")
# 主程序
if __name__ == "__main__":
print("带参数的定时任务启动(按Ctrl+C停止)")
# 传递参数给任务
schedule.every(3).seconds.do(
job_with_params,
name="备份任务",
message="正在备份数据库..."
)
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("\n定时任务已停止")4.5 取消任务
import schedule
import time
import datetime
# 全局变量用于存储任务
job_instance = None
def job():
"""定时执行的任务"""
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"任务执行时间: {current_time}")
print("执行一些定时任务...")
# 主程序
if __name__ == "__main__":
print("可取消的定时任务启动(按Ctrl+C停止)")
# 调度任务并保存任务实例
job_instance = schedule.every(3).seconds.do(job)
try:
count = 0
while True:
schedule.run_pending()
time.sleep(1)
count += 1
# 执行5次后取消任务
if count == 15: # 约5次执行(每次间隔3秒)
print("\n取消定时任务...")
schedule.cancel_job(job_instance)
print("定时任务已取消")
break
except KeyboardInterrupt:
print("\n定时任务已停止")5. 使用APScheduler库实现高级定时任务
5.1 APScheduler概述
APScheduler(Advanced Python Scheduler)是一个功能强大的Python定时任务库,支持多种调度器和触发器。
主要特点:
- 支持多种调度器:BlockingScheduler, BackgroundScheduler, AsyncIOScheduler等
- 支持多种触发器:DateTrigger, IntervalTrigger, CronTrigger
- 支持作业存储:内存、数据库、Redis等
- 支持并发执行:线程池、进程池
- 支持任务持久化:重启后可恢复任务
- 支持任务依赖:定义任务执行顺序
5.2 安装APScheduler
pip install apscheduler5.3 使用IntervalTrigger实现间隔执行
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
import datetime
# 创建调度器
scheduler = BlockingScheduler()
def job():
"""定时执行的任务"""
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"任务执行时间: {current_time}")
print("执行一些定时任务...")
# 主程序
if __name__ == "__main__":
print("APScheduler定时任务启动(按Ctrl+C停止)")
# 使用IntervalTrigger调度任务(每5秒执行一次)
scheduler.add_job(
job, # 任务函数
trigger=IntervalTrigger(seconds=5), # 间隔5秒
id="interval_job", # 任务ID
name="间隔执行任务", # 任务名称
replace_existing=True # 替换已存在的任务
)
try:
scheduler.start() # 运行调度器
except KeyboardInterrupt:
print("\n定时任务已停止")
scheduler.shutdown() # 关闭调度器5.4 使用CronTrigger实现复杂调度
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
import datetime
# 创建调度器
scheduler = BlockingScheduler()
def job():
"""定时执行的任务"""
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"任务执行时间: {current_time}")
print("执行一些定时任务...")
# 主程序
if __name__ == "__main__":
print("APScheduler CronTrigger定时任务启动(按Ctrl+C停止)")
# 使用CronTrigger调度任务(每天10:30执行)
scheduler.add_job(
job,
trigger=CronTrigger(hour=10, minute=30),
id="cron_job",
name="定时执行任务",
replace_existing=True
)
# 其他CronTrigger示例:
# 每周一到周五的14:00执行
# trigger=CronTrigger(day_of_week='mon-fri', hour=14, minute=0)
# 每月1号的10:00执行
# trigger=CronTrigger(day=1, hour=10, minute=0)
# 每5分钟执行一次
# trigger=CronTrigger(minute='*/5')
try:
scheduler.start()
except KeyboardInterrupt:
print("\n定时任务已停止")
scheduler.shutdown()5.5 使用BackgroundScheduler实现后台执行
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
import datetime
import time
# 创建后台调度器
scheduler = BackgroundScheduler()
def job():
"""定时执行的任务"""
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"后台任务执行时间: {current_time}")
print("执行一些后台定时任务...")
# 主程序
if __name__ == "__main__":
print("后台定时任务启动")
# 添加任务
scheduler.add_job(
job,
trigger=IntervalTrigger(seconds=5),
id="background_job",
name="后台间隔执行任务",
replace_existing=True
)
# 启动调度器
scheduler.start()
print("主程序继续执行其他任务...")
try:
# 主程序运行20秒
for i in range(20):
print(f"主程序执行中... {i+1}/20")
time.sleep(1)
except KeyboardInterrupt:
print("\n程序被中断")
finally:
# 关闭调度器
scheduler.shutdown()
print("后台调度器已关闭")
print("程序结束")6. 系统级定时任务
6.1 Linux/macOS系统使用cron
cron是Linux/macOS系统内置的定时任务工具,可以在系统级别调度任务。
cron语法:
* * * * * command
- - - - -
| | | | |
| | | | +--- 星期几 (0 - 7) (星期日=0或7)
| | | +----- 月份 (1 - 12)
| | +------- 日期 (1 - 31)
| +--------- 小时 (0 - 23)
+----------- 分钟 (0 - 59)示例:
# 每天10:30执行Python脚本
30 10 * * * python3 /path/to/your/script.py
# 每5分钟执行一次
*/5 * * * * python3 /path/to/your/script.py
# 每周一的14:00执行
0 14 * * 1 python3 /path/to/your/script.py使用方法:
- 打开终端
- 输入
crontab -e编辑cron任务 - 添加上述格式的任务
- 保存退出(按Ctrl+X,然后Y,然后Enter)
6.2 Windows系统使用任务计划程序
Windows系统内置了任务计划程序,可以创建和管理定时任务。
使用方法:
- 打开"控制面板" -> "管理工具" -> "任务计划程序"
- 点击"创建基本任务"
- 按照向导设置任务名称、触发器(时间、频率)、操作(启动程序)
- 在"程序/脚本"中选择python.exe的路径
- 在"添加参数"中输入脚本的完整路径
- 完成向导并启用任务
7. 实际应用案例
7.1 定时备份文件
import schedule
import time
import shutil
import os
import datetime
def backup_files():
"""定时备份文件"""
current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
source_dir = "C:\\Users\\User\\Documents"
backup_dir = "C:\\Backup"
backup_file = f"{backup_dir}\\documents_backup_{current_time}.zip"
try:
# 创建备份目录
os.makedirs(backup_dir, exist_ok=True)
# 压缩并备份文件
shutil.make_archive(backup_file.replace(".zip", ""), "zip", source_dir)
print(f"✓ 备份成功: {backup_file}")
print(f"✓ 备份时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
except Exception as e:
print(f"✗ 备份失败: {e}")
# 主程序
if __name__ == "__main__":
print("文件备份定时任务启动(按Ctrl+C停止)")
# 每天23:00执行备份
schedule.every().day.at("23:00").do(backup_files)
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("\n文件备份定时任务已停止")7.2 定时发送邮件
import schedule
import time
import smtplib
from email.mime.text import MIMEText
from email.header import Header
import datetime
# 邮件配置
MAIL_CONFIG = {
"smtp_host": "smtp.example.com",
"smtp_port": 465,
"smtp_user": "your_email@example.com",
"smtp_pass": "your_password",
"sender": "your_email@example.com",
"receivers": ["user@example.com"]
}
def send_daily_report():
"""定时发送日报"""
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 邮件内容
subject = f"每日报告 - {datetime.datetime.now().strftime('%Y-%m-%d')}"
content = f"尊敬的用户:\n\n这是您的每日报告。\n\n报告生成时间:{current_time}\n\n祝您工作愉快!"
try:
# 创建邮件对象
message = MIMEText(content, 'plain', 'utf-8')
message['From'] = Header("自动报告系统", 'utf-8')
message['To'] = Header("用户", 'utf-8')
message['Subject'] = Header(subject, 'utf-8')
# 发送邮件
with smtplib.SMTP_SSL(MAIL_CONFIG["smtp_host"], MAIL_CONFIG["smtp_port"]) as smtp:
smtp.login(MAIL_CONFIG["smtp_user"], MAIL_CONFIG["smtp_pass"])
smtp.sendmail(MAIL_CONFIG["sender"], MAIL_CONFIG["receivers"], message.as_string())
print(f"✓ 日报发送成功: {subject}")
print(f"✓ 发送时间: {current_time}")
except Exception as e:
print(f"✗ 日报发送失败: {e}")
# 主程序
if __name__ == "__main__":
print("日报邮件定时任务启动(按Ctrl+C停止)")
# 每天上午9:00发送日报
schedule.every().day.at("09:00").do(send_daily_report)
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("\n日报邮件定时任务已停止")7.3 定时数据采集
import schedule
import time
import requests
import json
import os
import datetime
def collect_data():
"""定时采集API数据"""
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
# 请求API
response = requests.get("https://api.example.com/data")
response.raise_for_status() # 检查请求是否成功
# 解析数据
data = response.json()
# 保存数据
data_dir = "data"
os.makedirs(data_dir, exist_ok=True)
file_name = f"{data_dir}\\data_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(file_name, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
print(f"✓ 数据采集成功: {file_name}")
print(f"✓ 采集时间: {current_time}")
print(f"✓ 数据量: {len(data)}条")
except Exception as e:
print(f"✗ 数据采集失败: {e}")
# 主程序
if __name__ == "__main__":
print("数据采集定时任务启动(按Ctrl+C停止)")
# 每小时采集一次数据
schedule.every().hour.do(collect_data)
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("\n数据采集定时任务已停止")8. 定时任务最佳实践
8.1 错误处理
- 对所有可能的错误进行捕获和处理
- 记录详细的日志信息
- 实现错误重试机制
import schedule
import time
import logging
import datetime
# 配置日志
logging.basicConfig(
filename='scheduler.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def job():
"""带错误处理的定时任务"""
try:
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"任务执行时间: {current_time}")
# 模拟可能出错的操作
result = 1 / 0 # 故意引发除零错误
logging.info("任务执行成功")
except Exception as e:
error_msg = f"任务执行失败: {e}"
print(f"✗ {error_msg}")
logging.error(error_msg)
# 主程序
if __name__ == "__main__":
print("带错误处理的定时任务启动(按Ctrl+C停止)")
schedule.every(3).seconds.do(job)
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("\n定时任务已停止")8.2 任务持久化
- 使用数据库存储任务状态
- 记录任务执行历史
- 实现任务恢复机制
8.3 性能优化
- 避免在任务中执行长时间运行的操作
- 合理设置任务优先级
- 使用异步执行或多线程处理并发任务
8.4 安全性考虑
- 限制任务的执行权限
- 加密敏感信息(如数据库密码、API密钥)
- 验证任务的来源和完整性
8.5 可维护性
- 模块化设计:将不同功能封装为函数
- 参数化配置:使用配置文件或环境变量
- 详细文档:添加注释和使用说明
9. 总结
定时任务是自动化工作流程的重要组成部分,Python提供了多种实现方式,从简单的time.sleep()到复杂的APScheduler库,以及系统级的定时任务工具。
选择合适的实现方式:
- 简单间隔执行:使用
time.sleep()或schedule库 - 复杂调度需求:使用
APScheduler库 - 系统级稳定运行:使用cron或任务计划程序
通过学习本集内容,您可以根据实际需求选择合适的定时任务实现方式,并应用到各种自动化场景中,提高工作效率和系统稳定性。