第117集_线程池
学习目标
- 理解线程池的概念和设计思想
- 掌握Python中线程池的实现方法
- 学会使用
concurrent.futures.ThreadPoolExecutor进行并发编程 - 了解线程池的高级特性和最佳实践
- 能够在实际项目中正确应用线程池提高性能
一、线程池的概念
1. 什么是线程池?
线程池是一种线程管理机制,它预先创建一定数量的线程,放入池中,当有任务需要执行时,从池中取出空闲线程执行任务,任务完成后线程不会被销毁,而是放回池中等待下一个任务。
2. 线程池的设计思想
- 资源复用:避免频繁创建和销毁线程带来的性能开销
- 控制并发数量:防止系统资源耗尽
- 提高响应速度:任务到达时不需要等待线程创建
- 统一管理:便于监控和管理线程
3. 线程池的核心组件
- 线程池管理器:创建和管理线程池
- 工作线程:执行具体任务的线程
- 任务队列:存储等待执行的任务
- 任务接口:定义任务的执行方法
二、线程池的优势
1. 性能提升
- 避免线程创建和销毁的开销(每个线程创建约需1MB内存)
- 减少CPU上下文切换的次数
2. 资源管理
- 限制最大并发线程数,防止资源耗尽
- 统一分配、调优和监控线程资源
3. 可靠性
- 提供任务队列缓冲机制
- 支持任务执行结果的管理
- 可以优雅地处理异常
4. 易用性
- 简化并发编程模型
- 提供统一的接口
三、Python中的线程池实现
1. 早期的自定义实现
在Python 3.2之前,开发者需要自己实现线程池:
import threading
import queue
class ThreadPool:
def __init__(self, max_workers):
self.max_workers = max_workers
self.task_queue = queue.Queue()
self.workers = []
self.shutdown_flag = threading.Event()
# 创建工作线程
for i in range(max_workers):
worker = threading.Thread(target=self._worker)
self.workers.append(worker)
worker.start()
def _worker(self):
while not self.shutdown_flag.is_set() or not self.task_queue.empty():
try:
# 从队列获取任务,超时1秒
task, args, kwargs, callback = self.task_queue.get(timeout=1)
try:
result = task(*args, **kwargs)
if callback:
callback(result)
except Exception as e:
print(f"任务执行出错: {e}")
finally:
self.task_queue.task_done()
except queue.Empty:
continue
def submit(self, task, *args, callback=None, **kwargs):
self.task_queue.put((task, args, kwargs, callback))
def shutdown(self, wait=True):
self.shutdown_flag.set()
if wait:
for worker in self.workers:
worker.join()2. concurrent.futures.ThreadPoolExecutor
Python 3.2引入了concurrent.futures模块,提供了更简洁易用的线程池实现:
from concurrent.futures import ThreadPoolExecutor
# 创建线程池
with ThreadPoolExecutor(max_workers=5) as executor:
# 提交任务
future = executor.submit(func, args)
# 获取结果
result = future.result()四、ThreadPoolExecutor的基本使用
1. 创建线程池
from concurrent.futures import ThreadPoolExecutor
# 创建线程池,最多5个线程
pool = ThreadPoolExecutor(max_workers=5)
# 使用上下文管理器(推荐)
with ThreadPoolExecutor(max_workers=5) as executor:
# 执行任务
pass2. 提交单个任务
import time
from concurrent.futures import ThreadPoolExecutor
# 定义任务函数
def task(n):
print(f"任务 {n} 开始执行")
time.sleep(1)
print(f"任务 {n} 执行完成")
return n * n
# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任务,返回Future对象
future = executor.submit(task, 5)
print("任务已提交,主线程继续执行")
# 获取任务结果(阻塞直到任务完成)
result = future.result()
print(f"任务执行结果: {result}")3. 批量提交任务
import time
from concurrent.futures import ThreadPoolExecutor
# 定义任务函数
def task(n):
print(f"任务 {n} 开始执行")
time.sleep(1)
return n * n
# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 使用map方法批量提交任务
results = list(executor.map(task, range(10)))
print(f"所有任务执行完成,结果: {results}")五、线程池的高级特性
1. Future对象
Future对象表示异步操作的结果,提供了以下方法:
| 方法 | 说明 |
|---|---|
result(timeout=None) |
获取任务结果,可选超时时间 |
done() |
检查任务是否完成 |
cancel() |
取消任务(仅当任务未开始执行时有效) |
running() |
检查任务是否正在执行 |
add_done_callback(fn) |
为任务添加回调函数 |
2. 回调函数
import time
from concurrent.futures import ThreadPoolExecutor
# 定义任务函数
def task(n):
time.sleep(1)
return n * n
# 定义回调函数
def callback(future):
result = future.result()
print(f"回调函数: 任务结果是 {result}")
# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
future = executor.submit(task, 10)
# 添加回调函数
future.add_done_callback(callback)
print("主线程继续执行")
# 等待所有任务完成
executor.shutdown(wait=True)3. 异常处理
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def task(n):
if n == 5:
raise ValueError("任务5执行出错")
time.sleep(0.5)
return n * n
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交所有任务
futures = [executor.submit(task, i) for i in range(10)]
# 处理完成的任务
for future in as_completed(futures):
try:
result = future.result()
print(f"任务执行成功: {result}")
except Exception as e:
print(f"任务执行出错: {e}")4. 超时处理
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError
def long_task():
time.sleep(5)
return "任务完成"
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(long_task)
try:
# 等待最多3秒
result = future.result(timeout=3)
print(f"任务结果: {result}")
except TimeoutError:
print("任务超时")六、线程池的最佳实践
1. 合理设置线程池大小
线程池大小的设置需要考虑以下因素:
- CPU核心数
- 任务的I/O密集程度
- 系统资源限制
经验公式:
- I/O密集型任务:线程数 = CPU核心数 × 2
- CPU密集型任务:线程数 = CPU核心数 + 1
import os
from concurrent.futures import ThreadPoolExecutor
# 获取CPU核心数
cpu_count = os.cpu_count()
print(f"CPU核心数: {cpu_count}")
# I/O密集型任务的线程池
io_pool = ThreadPoolExecutor(max_workers=cpu_count * 2)
# CPU密集型任务的线程池
cpu_pool = ThreadPoolExecutor(max_workers=cpu_count + 1)2. 使用上下文管理器
# 推荐:使用上下文管理器自动关闭线程池
with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(task, range(10))
# 不推荐:手动管理线程池生命周期
pool = ThreadPoolExecutor(max_workers=5)
try:
results = pool.map(task, range(10))
finally:
pool.shutdown()3. 避免共享状态
# 不推荐:共享全局变量
shared_data = []
def task(n):
shared_data.append(n) # 线程不安全
return n
# 推荐:使用局部变量或线程安全的数据结构
from queue import Queue
task_queue = Queue()
def safe_task(n):
result = n * 2
task_queue.put(result) # 线程安全
return result4. 监控线程池
from concurrent.futures import ThreadPoolExecutor
import time
class MonitoredThreadPoolExecutor(ThreadPoolExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.tasks_submitted = 0
self.tasks_completed = 0
def submit(self, fn, *args, **kwargs):
self.tasks_submitted += 1
future = super().submit(fn, *args, **kwargs)
future.add_done_callback(lambda f: self._task_completed())
return future
def _task_completed(self):
self.tasks_completed += 1
print(f"进度: {self.tasks_completed}/{self.tasks_submitted}")
# 使用监控线程池
with MonitoredThreadPoolExecutor(max_workers=3) as executor:
executor.map(lambda x: time.sleep(0.5), range(10))七、线程池的实际应用场景
1. 网络请求并发处理
import requests
from concurrent.futures import ThreadPoolExecutor
# 定义请求函数
def fetch_url(url):
response = requests.get(url)
return url, response.status_code, len(response.content)
# 要请求的URL列表
urls = [
"https://www.baidu.com",
"https://www.google.com",
"https://www.github.com",
"https://www.python.org",
"https://www.csdn.net"
]
# 使用线程池并发请求
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(fetch_url, urls))
# 输出结果
for url, status_code, content_length in results:
print(f"URL: {url}, 状态码: {status_code}, 内容长度: {content_length}")2. 文件批量处理
import os
import time
from concurrent.futures import ThreadPoolExecutor
# 定义文件处理函数
def process_file(file_path):
print(f"开始处理文件: {file_path}")
time.sleep(1) # 模拟文件处理
with open(file_path, 'r') as f:
content = f.read()
line_count = len(content.splitlines())
print(f"完成处理文件: {file_path}, 行数: {line_count}")
return file_path, line_count
# 获取所有文本文件
file_list = [f for f in os.listdir('.') if f.endswith('.txt')]
# 使用线程池批量处理
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(process_file, file_list))
# 输出结果
print("\n处理结果:")
for file_path, line_count in results:
print(f"{file_path}: {line_count}行")八、线程池与其他并发机制的比较
| 并发机制 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 线程池 | 资源复用、控制并发数、提高响应速度 | 实现相对复杂 | 大量短期任务 |
| 普通线程 | 实现简单 | 频繁创建销毁开销大 | 少量长期任务 |
| 多进程 | 充分利用多核CPU | 内存占用大、通信复杂 | CPU密集型任务 |
| 协程 | 开销极小、切换快 | 需要特殊支持 | I/O密集型任务 |
九、实践练习
练习1:并发下载图片
编写一个程序,使用线程池并发下载多张图片:
- 定义图片URL列表
- 编写下载函数
- 使用线程池并发下载
- 计算下载时间和成功率
练习2:并行计算
使用线程池计算1到1000000的质数个数:
- 编写判断质数的函数
- 将数字范围分成多个子任务
- 使用线程池并行计算
- 合并结果
练习3:异步Web请求
使用线程池和回调函数实现异步Web请求:
- 使用
requests库发送HTTP请求 - 为每个请求添加回调函数
- 在回调函数中处理响应数据
- 统计请求结果
十、总结
- 线程池是一种高效的线程管理机制,通过预先创建线程并复用,避免了频繁创建和销毁线程的开销
- Python的
concurrent.futures.ThreadPoolExecutor提供了简洁易用的线程池实现 - 线程池的核心方法包括
submit()和map(),用于提交任务 Future对象表示异步操作的结果,提供了获取结果、添加回调等方法- 使用线程池时需要注意合理设置线程数、避免共享状态、使用上下文管理器等最佳实践
- 线程池适用于处理大量短期任务,特别是I/O密集型任务
十一、课后思考
- 线程池的最大线程数应该如何确定?
submit()和map()方法有什么区别?分别适用于什么场景?- 为什么说线程池特别适合处理I/O密集型任务?
- 如何处理线程池中的任务异常?
- 线程池与进程池的区别是什么?分别适用于什么场景?