第117集_线程池

学习目标

  1. 理解线程池的概念和设计思想
  2. 掌握Python中线程池的实现方法
  3. 学会使用concurrent.futures.ThreadPoolExecutor进行并发编程
  4. 了解线程池的高级特性和最佳实践
  5. 能够在实际项目中正确应用线程池提高性能

一、线程池的概念

1. 什么是线程池?

线程池是一种线程管理机制,它预先创建一定数量的线程,放入池中,当有任务需要执行时,从池中取出空闲线程执行任务,任务完成后线程不会被销毁,而是放回池中等待下一个任务。

2. 线程池的设计思想

  • 资源复用:避免频繁创建和销毁线程带来的性能开销
  • 控制并发数量:防止系统资源耗尽
  • 提高响应速度:任务到达时不需要等待线程创建
  • 统一管理:便于监控和管理线程

3. 线程池的核心组件

  • 线程池管理器:创建和管理线程池
  • 工作线程:执行具体任务的线程
  • 任务队列:存储等待执行的任务
  • 任务接口:定义任务的执行方法

二、线程池的优势

1. 性能提升

  • 避免线程创建和销毁的开销(每个线程创建约需1MB内存)
  • 减少CPU上下文切换的次数

2. 资源管理

  • 限制最大并发线程数,防止资源耗尽
  • 统一分配、调优和监控线程资源

3. 可靠性

  • 提供任务队列缓冲机制
  • 支持任务执行结果的管理
  • 可以优雅地处理异常

4. 易用性

  • 简化并发编程模型
  • 提供统一的接口

三、Python中的线程池实现

1. 早期的自定义实现

在Python 3.2之前,开发者需要自己实现线程池:

import threading
import queue

class ThreadPool:
    def __init__(self, max_workers):
        self.max_workers = max_workers
        self.task_queue = queue.Queue()
        self.workers = []
        self.shutdown_flag = threading.Event()
        
        # 创建工作线程
        for i in range(max_workers):
            worker = threading.Thread(target=self._worker)
            self.workers.append(worker)
            worker.start()
    
    def _worker(self):
        while not self.shutdown_flag.is_set() or not self.task_queue.empty():
            try:
                # 从队列获取任务,超时1秒
                task, args, kwargs, callback = self.task_queue.get(timeout=1)
                try:
                    result = task(*args, **kwargs)
                    if callback:
                        callback(result)
                except Exception as e:
                    print(f"任务执行出错: {e}")
                finally:
                    self.task_queue.task_done()
            except queue.Empty:
                continue
    
    def submit(self, task, *args, callback=None, **kwargs):
        self.task_queue.put((task, args, kwargs, callback))
    
    def shutdown(self, wait=True):
        self.shutdown_flag.set()
        if wait:
            for worker in self.workers:
                worker.join()

2. concurrent.futures.ThreadPoolExecutor

Python 3.2引入了concurrent.futures模块,提供了更简洁易用的线程池实现:

from concurrent.futures import ThreadPoolExecutor

# 创建线程池
with ThreadPoolExecutor(max_workers=5) as executor:
    # 提交任务
    future = executor.submit(func, args)
    # 获取结果
    result = future.result()

四、ThreadPoolExecutor的基本使用

1. 创建线程池

from concurrent.futures import ThreadPoolExecutor

# 创建线程池,最多5个线程
pool = ThreadPoolExecutor(max_workers=5)

# 使用上下文管理器(推荐)
with ThreadPoolExecutor(max_workers=5) as executor:
    # 执行任务
    pass

2. 提交单个任务

import time
from concurrent.futures import ThreadPoolExecutor

# 定义任务函数
def task(n):
    print(f"任务 {n} 开始执行")
    time.sleep(1)
    print(f"任务 {n} 执行完成")
    return n * n

# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交任务,返回Future对象
    future = executor.submit(task, 5)
    
    print("任务已提交,主线程继续执行")
    
    # 获取任务结果(阻塞直到任务完成)
    result = future.result()
    print(f"任务执行结果: {result}")

3. 批量提交任务

import time
from concurrent.futures import ThreadPoolExecutor

# 定义任务函数
def task(n):
    print(f"任务 {n} 开始执行")
    time.sleep(1)
    return n * n

# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
    # 使用map方法批量提交任务
    results = list(executor.map(task, range(10)))
    print(f"所有任务执行完成,结果: {results}")

五、线程池的高级特性

1. Future对象

Future对象表示异步操作的结果,提供了以下方法:

方法 说明
result(timeout=None) 获取任务结果,可选超时时间
done() 检查任务是否完成
cancel() 取消任务(仅当任务未开始执行时有效)
running() 检查任务是否正在执行
add_done_callback(fn) 为任务添加回调函数

2. 回调函数

import time
from concurrent.futures import ThreadPoolExecutor

# 定义任务函数
def task(n):
    time.sleep(1)
    return n * n

# 定义回调函数
def callback(future):
    result = future.result()
    print(f"回调函数: 任务结果是 {result}")

# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
    future = executor.submit(task, 10)
    # 添加回调函数
    future.add_done_callback(callback)
    
    print("主线程继续执行")
    # 等待所有任务完成
    executor.shutdown(wait=True)

3. 异常处理

import time
from concurrent.futures import ThreadPoolExecutor, as_completed

def task(n):
    if n == 5:
        raise ValueError("任务5执行出错")
    time.sleep(0.5)
    return n * n

with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交所有任务
    futures = [executor.submit(task, i) for i in range(10)]
    
    # 处理完成的任务
    for future in as_completed(futures):
        try:
            result = future.result()
            print(f"任务执行成功: {result}")
        except Exception as e:
            print(f"任务执行出错: {e}")

4. 超时处理

import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError

def long_task():
    time.sleep(5)
    return "任务完成"

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(long_task)
    
    try:
        # 等待最多3秒
        result = future.result(timeout=3)
        print(f"任务结果: {result}")
    except TimeoutError:
        print("任务超时")

六、线程池的最佳实践

1. 合理设置线程池大小

线程池大小的设置需要考虑以下因素:

  • CPU核心数
  • 任务的I/O密集程度
  • 系统资源限制

经验公式

  • I/O密集型任务:线程数 = CPU核心数 × 2
  • CPU密集型任务:线程数 = CPU核心数 + 1
import os
from concurrent.futures import ThreadPoolExecutor

# 获取CPU核心数
cpu_count = os.cpu_count()
print(f"CPU核心数: {cpu_count}")

# I/O密集型任务的线程池
io_pool = ThreadPoolExecutor(max_workers=cpu_count * 2)

# CPU密集型任务的线程池
cpu_pool = ThreadPoolExecutor(max_workers=cpu_count + 1)

2. 使用上下文管理器

# 推荐:使用上下文管理器自动关闭线程池
with ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(task, range(10))

# 不推荐:手动管理线程池生命周期
pool = ThreadPoolExecutor(max_workers=5)
try:
    results = pool.map(task, range(10))
finally:
    pool.shutdown()

3. 避免共享状态

# 不推荐:共享全局变量
shared_data = []

def task(n):
    shared_data.append(n)  # 线程不安全
    return n

# 推荐:使用局部变量或线程安全的数据结构
from queue import Queue

task_queue = Queue()

def safe_task(n):
    result = n * 2
    task_queue.put(result)  # 线程安全
    return result

4. 监控线程池

from concurrent.futures import ThreadPoolExecutor
import time

class MonitoredThreadPoolExecutor(ThreadPoolExecutor):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.tasks_submitted = 0
        self.tasks_completed = 0
    
    def submit(self, fn, *args, **kwargs):
        self.tasks_submitted += 1
        future = super().submit(fn, *args, **kwargs)
        future.add_done_callback(lambda f: self._task_completed())
        return future
    
    def _task_completed(self):
        self.tasks_completed += 1
        print(f"进度: {self.tasks_completed}/{self.tasks_submitted}")

# 使用监控线程池
with MonitoredThreadPoolExecutor(max_workers=3) as executor:
    executor.map(lambda x: time.sleep(0.5), range(10))

七、线程池的实际应用场景

1. 网络请求并发处理

import requests
from concurrent.futures import ThreadPoolExecutor

# 定义请求函数
def fetch_url(url):
    response = requests.get(url)
    return url, response.status_code, len(response.content)

# 要请求的URL列表
urls = [
    "https://www.baidu.com",
    "https://www.google.com",
    "https://www.github.com",
    "https://www.python.org",
    "https://www.csdn.net"
]

# 使用线程池并发请求
with ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(fetch_url, urls))

# 输出结果
for url, status_code, content_length in results:
    print(f"URL: {url}, 状态码: {status_code}, 内容长度: {content_length}")

2. 文件批量处理

import os
import time
from concurrent.futures import ThreadPoolExecutor

# 定义文件处理函数
def process_file(file_path):
    print(f"开始处理文件: {file_path}")
    time.sleep(1)  # 模拟文件处理
    with open(file_path, 'r') as f:
        content = f.read()
    line_count = len(content.splitlines())
    print(f"完成处理文件: {file_path}, 行数: {line_count}")
    return file_path, line_count

# 获取所有文本文件
file_list = [f for f in os.listdir('.') if f.endswith('.txt')]

# 使用线程池批量处理
with ThreadPoolExecutor(max_workers=3) as executor:
    results = list(executor.map(process_file, file_list))

# 输出结果
print("\n处理结果:")
for file_path, line_count in results:
    print(f"{file_path}: {line_count}行")

八、线程池与其他并发机制的比较

并发机制 优点 缺点 适用场景
线程池 资源复用、控制并发数、提高响应速度 实现相对复杂 大量短期任务
普通线程 实现简单 频繁创建销毁开销大 少量长期任务
多进程 充分利用多核CPU 内存占用大、通信复杂 CPU密集型任务
协程 开销极小、切换快 需要特殊支持 I/O密集型任务

九、实践练习

练习1:并发下载图片

编写一个程序,使用线程池并发下载多张图片:

  1. 定义图片URL列表
  2. 编写下载函数
  3. 使用线程池并发下载
  4. 计算下载时间和成功率

练习2:并行计算

使用线程池计算1到1000000的质数个数:

  1. 编写判断质数的函数
  2. 将数字范围分成多个子任务
  3. 使用线程池并行计算
  4. 合并结果

练习3:异步Web请求

使用线程池和回调函数实现异步Web请求:

  1. 使用requests库发送HTTP请求
  2. 为每个请求添加回调函数
  3. 在回调函数中处理响应数据
  4. 统计请求结果

十、总结

  1. 线程池是一种高效的线程管理机制,通过预先创建线程并复用,避免了频繁创建和销毁线程的开销
  2. Python的concurrent.futures.ThreadPoolExecutor提供了简洁易用的线程池实现
  3. 线程池的核心方法包括submit()map(),用于提交任务
  4. Future对象表示异步操作的结果,提供了获取结果、添加回调等方法
  5. 使用线程池时需要注意合理设置线程数、避免共享状态、使用上下文管理器等最佳实践
  6. 线程池适用于处理大量短期任务,特别是I/O密集型任务

十一、课后思考

  1. 线程池的最大线程数应该如何确定?
  2. submit()map()方法有什么区别?分别适用于什么场景?
  3. 为什么说线程池特别适合处理I/O密集型任务?
  4. 如何处理线程池中的任务异常?
  5. 线程池与进程池的区别是什么?分别适用于什么场景?
« 上一篇 线程同步:条件变量 下一篇 » 多进程编程基础