第114集:线程同步:信号量
学习目标
- 理解信号量(Semaphore)的概念和工作原理
- 掌握信号量与锁(Lock)的区别和联系
- 学会使用Python中的Semaphore类实现线程同步
- 理解计数信号量和二进制信号量的概念
- 掌握信号量在实际项目中的应用场景
- 学会使用信号量解决资源限制和并发控制问题
一、信号量的基本概念
1.1 什么是信号量
信号量(Semaphore)是一种用于控制对共享资源访问的线程同步机制,它允许多个线程同时访问共享资源,但限制了同时访问的最大线程数量。
信号量维护了一个计数器,该计数器表示当前可用资源的数量。当线程需要访问资源时,它会尝试获取信号量(减少计数器);当线程完成资源使用时,它会释放信号量(增加计数器)。
1.2 信号量的工作原理
信号量的工作原理可以简单概括为:
- 初始化信号量时,设置一个初始计数器值
- 当线程需要访问资源时,调用
acquire()方法:- 如果计数器大于0,计数器减1,线程继续执行
- 如果计数器等于0,线程阻塞,等待其他线程释放资源
- 当线程完成资源使用时,调用
release()方法:- 计数器加1
- 如果有线程在等待,唤醒其中一个线程
1.3 信号量与锁的区别
| 特性 | 锁(Lock) | 信号量(Semaphore) |
|---|---|---|
| 资源访问数量 | 1个线程 | 多个线程(可配置) |
| 资源类型 | 互斥资源 | 共享资源池 |
| 基本操作 | acquire()/release() | acquire()/release() |
| 计数器初始值 | 1 | 可配置(≥0) |
| 适用场景 | 保护临界区 | 限制并发访问数量 |
简单来说,锁是信号量的一种特殊情况(二进制信号量,计数器初始值为1)。
二、Python中的信号量实现
2.1 threading.Semaphore类
Python的threading模块提供了Semaphore类,用于实现信号量机制。
基本使用语法:
import threading
# 创建信号量,初始值为3
semaphore = threading.Semaphore(3)
# 获取信号量
semaphore.acquire()
# 访问共享资源
# ...
# 释放信号量
semaphore.release()2.2 计数信号量
计数信号量(Counting Semaphore)允许指定数量的线程同时访问共享资源。例如,如果我们将信号量的初始值设置为3,那么最多只能有3个线程同时访问资源。
import threading
import time
# 创建信号量,最多允许3个线程同时访问
semaphore = threading.Semaphore(3)
# 工作函数
def worker(name):
print(f"线程 {name} 尝试获取信号量")
semaphore.acquire()
try:
print(f"线程 {name} 已获取信号量,开始工作")
time.sleep(2) # 模拟工作
print(f"线程 {name} 工作完成")
finally:
semaphore.release()
print(f"线程 {name} 已释放信号量")
# 创建5个线程
threads = []
for i in range(5):
thread = threading.Thread(target=worker, args=(i,))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()执行结果:
线程 0 尝试获取信号量
线程 0 已获取信号量,开始工作
线程 1 尝试获取信号量
线程 1 已获取信号量,开始工作
线程 2 尝试获取信号量
线程 2 已获取信号量,开始工作
线程 3 尝试获取信号量
线程 4 尝试获取信号量
线程 0 工作完成
线程 0 已释放信号量
线程 3 已获取信号量,开始工作
线程 1 工作完成
线程 1 已释放信号量
线程 4 已获取信号量,开始工作
线程 2 工作完成
线程 2 已释放信号量
线程 3 工作完成
线程 3 已释放信号量
线程 4 工作完成
线程 4 已释放信号量2.3 二进制信号量
二进制信号量(Binary Semaphore)是计数信号量的一种特殊情况,其初始值为1。它的行为类似于锁,但有一些细微的区别:
- 二进制信号量可以由一个线程获取,另一个线程释放
- 锁必须由获取它的线程释放
import threading
# 创建二进制信号量(初始值为1)
binary_semaphore = threading.Semaphore(1)
# 使用方式与锁类似
binary_semaphore.acquire()
try:
# 访问临界区
pass
finally:
binary_semaphore.release()三、信号量的应用场景
3.1 资源池管理
信号量常用于管理资源池,如数据库连接池、线程池等。
示例:数据库连接池
import threading
import time
import random
class ConnectionPool:
def __init__(self, size):
self.size = size
self.connections = [f"Connection-{i}" for i in range(size)]
self.semaphore = threading.Semaphore(size) # 信号量初始值等于连接池大小
self.lock = threading.Lock() # 保护连接列表的锁
def get_connection(self):
# 获取信号量(限制并发访问数量)
self.semaphore.acquire()
# 获取连接(需要锁保护)
with self.lock:
connection = self.connections.pop()
print(f"获取连接: {connection}")
return connection
def release_connection(self, connection):
# 释放连接(需要锁保护)
with self.lock:
self.connections.append(connection)
print(f"释放连接: {connection}")
# 释放信号量
self.semaphore.release()
# 创建连接池(大小为3)
pool = ConnectionPool(3)
# 工作函数
def worker(name):
# 获取连接
connection = pool.get_connection()
try:
# 使用连接(模拟数据库操作)
print(f"线程 {name} 使用连接 {connection} 进行数据库操作")
time.sleep(random.uniform(1, 3)) # 模拟操作时间
finally:
# 释放连接
pool.release_connection(connection)
# 创建8个线程,模拟并发访问
threads = []
for i in range(8):
thread = threading.Thread(target=worker, args=(i,))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()3.2 限制并发访问数量
信号量可以用于限制同时访问某个资源的线程数量,例如限制同时下载的文件数量、限制API请求的并发数等。
示例:限制并发下载数量
import threading
import time
import random
# 创建信号量,限制最多3个并发下载
download_semaphore = threading.Semaphore(3)
# 下载函数
def download_file(file_name):
print(f"尝试下载文件: {file_name}")
# 获取信号量(限制并发数)
download_semaphore.acquire()
try:
print(f"开始下载文件: {file_name}")
# 模拟下载过程
download_time = random.uniform(2, 5)
time.sleep(download_time)
print(f"文件下载完成: {file_name},耗时 {download_time:.2f} 秒")
finally:
# 释放信号量
download_semaphore.release()
# 要下载的文件列表
files = [
"file1.mp4", "file2.mp4", "file3.mp4", "file4.mp4",
"file5.mp4", "file6.mp4", "file7.mp4", "file8.mp4"
]
# 创建线程下载所有文件
threads = []
for file in files:
thread = threading.Thread(target=download_file, args=(file,))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print("所有文件下载完成!")3.3 生产者-消费者模式
信号量也可以用于实现生产者-消费者模式,控制生产者和消费者的速度。
示例:使用信号量实现生产者-消费者
import threading
import time
import random
from queue import Queue
# 共享队列
queue = Queue(maxsize=5)
# 信号量
empty = threading.Semaphore(5) # 初始值为队列容量,表示空位置数量
full = threading.Semaphore(0) # 初始值为0,表示满位置数量
# 生产者函数
def producer(name):
for i in range(10):
# 生产数据
item = f"{name}-{i}"
print(f"生产者 {name} 生产: {item}")
# 获取空位置信号量
empty.acquire()
# 将数据放入队列
queue.put(item)
print(f"生产者 {name} 将 {item} 放入队列")
# 释放满位置信号量
full.release()
# 模拟生产时间
time.sleep(random.uniform(0.5, 1.5))
# 消费者函数
def consumer(name):
for i in range(10):
# 获取满位置信号量
full.acquire()
# 从队列取出数据
item = queue.get()
print(f"消费者 {name} 从队列取出: {item}")
# 消费数据
print(f"消费者 {name} 消费: {item}")
# 释放空位置信号量
empty.release()
# 模拟消费时间
time.sleep(random.uniform(0.5, 2.0))
# 创建生产者和消费者线程
producers = [
threading.Thread(target=producer, args=("P1",)),
threading.Thread(target=producer, args=("P2",))
]
consumers = [
threading.Thread(target=consumer, args=("C1",)),
threading.Thread(target=consumer, args=("C2",))
]
# 启动线程
for p in producers:
p.start()
for c in consumers:
c.start()
# 等待所有线程完成
for p in producers:
p.join()
for c in consumers:
c.join()
print("所有生产和消费完成!")四、信号量的注意事项
4.1 信号量的初始值设置
信号量的初始值应该根据实际需求设置:
- 如果是限制并发访问数量,初始值应设置为允许的最大并发数
- 如果是资源池管理,初始值应设置为资源池的大小
- 如果是二进制信号量,初始值应设置为1
4.2 信号量的获取和释放
- 总是在
try...finally块中使用信号量,确保无论是否发生异常,信号量都会被释放 - 避免在信号量保护的临界区内执行耗时过长的操作,以免影响其他线程
- 不要忘记释放信号量,否则会导致资源泄漏和死锁
4.3 信号量与其他同步机制的结合
信号量可以与其他线程同步机制(如锁、条件变量等)结合使用,以实现更复杂的同步需求。
五、信号量的常见应用案例
5.1 Web服务器连接限制
Web服务器使用信号量来限制同时处理的请求数量,防止服务器过载。
import threading
import time
# 限制最多同时处理5个请求
request_semaphore = threading.Semaphore(5)
def handle_request(request_id):
print(f"收到请求: {request_id}")
# 获取信号量
request_semaphore.acquire()
try:
print(f"开始处理请求: {request_id}")
# 模拟处理时间
time.sleep(2)
print(f"请求处理完成: {request_id}")
finally:
# 释放信号量
request_semaphore.release()
# 模拟20个请求
for i in range(20):
threading.Thread(target=handle_request, args=(i,)).start()
time.sleep(0.5) # 模拟请求间隔5.2 数据库连接池
数据库连接池使用信号量来管理数据库连接的分配和回收。
import threading
import time
class DBPool:
def __init__(self, size):
self.size = size
self.connections = [f"DB-Conn-{i}" for i in range(size)]
self.semaphore = threading.Semaphore(size)
self.lock = threading.Lock()
def get_connection(self):
self.semaphore.acquire()
with self.lock:
return self.connections.pop()
def release_connection(self, conn):
with self.lock:
self.connections.append(conn)
self.semaphore.release()
# 创建连接池(大小为3)
db_pool = DBPool(3)
def execute_query(thread_id):
conn = db_pool.get_connection()
print(f"线程 {thread_id} 使用连接 {conn} 执行查询")
time.sleep(1) # 模拟查询时间
db_pool.release_connection(conn)
print(f"线程 {thread_id} 释放连接 {conn}")
# 10个线程同时查询
for i in range(10):
threading.Thread(target=execute_query, args=(i,)).start()5.3 线程池实现
线程池使用信号量来控制活跃线程的数量。
import threading
import time
import queue
class ThreadPool:
def __init__(self, size):
self.size = size
self.task_queue = queue.Queue()
self.semaphore = threading.Semaphore(size)
self.workers = []
self.running = True
# 创建工作线程
for i in range(size):
worker = threading.Thread(target=self.worker, args=(i,))
self.workers.append(worker)
worker.start()
def worker(self, worker_id):
while self.running:
try:
# 等待任务
task, args, kwargs = self.task_queue.get(timeout=1)
# 执行任务
print(f"工作线程 {worker_id} 执行任务")
task(*args, **kwargs)
# 标记任务完成
self.task_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"工作线程 {worker_id} 发生错误: {e}")
def submit(self, task, *args, **kwargs):
"""提交任务到线程池"""
self.task_queue.put((task, args, kwargs))
def shutdown(self):
"""关闭线程池"""
self.running = False
for worker in self.workers:
worker.join()
# 测试函数
def task(name):
print(f"开始执行任务: {name}")
time.sleep(1) # 模拟任务执行时间
print(f"任务执行完成: {name}")
# 创建线程池(大小为3)
pool = ThreadPool(3)
# 提交10个任务
for i in range(10):
pool.submit(task, f"Task-{i}")
# 等待所有任务完成
pool.task_queue.join()
# 关闭线程池
pool.shutdown()
print("所有任务完成!")六、总结
6.1 信号量的优缺点
优点:
- 灵活控制并发访问数量
- 适用于资源池管理
- 可以实现复杂的同步模式
- 支持多个线程同时访问资源
缺点:
- 使用不当容易导致死锁
- 比锁的实现更复杂
- 计数器管理需要额外的开销
6.2 信号量的最佳实践
- 根据实际需求合理设置信号量的初始值
- 始终在
try...finally块中使用信号量 - 避免在信号量保护的临界区内执行耗时操作
- 结合其他同步机制使用,实现更复杂的同步需求
- 定期检查信号量的使用情况,避免资源泄漏
七、课后练习
练习1:实现一个简单的连接池
- 使用信号量实现一个容量为5的连接池
- 连接池应支持获取连接和释放连接的操作
- 创建10个线程同时使用连接池,验证连接池的正确性
练习2:限制并发下载数量
- 使用信号量限制最多同时下载3个文件
- 模拟下载10个文件,每个文件下载时间为1-3秒
- 打印下载的开始和完成信息,验证并发限制的正确性
练习3:使用信号量实现生产者-消费者模式
- 创建一个容量为10的队列
- 实现2个生产者和3个消费者
- 使用信号量控制生产者和消费者的速度
- 生产者每秒生产1个数据,消费者每秒消费2个数据
练习4:Web服务器请求限制
- 实现一个简单的Web服务器请求处理函数
- 使用信号量限制最多同时处理10个请求
- 模拟20个并发请求,验证请求限制的正确性
八、学习资源推荐
- Python官方文档:https://docs.python.org/3/library/threading.html#semaphore-objects
- 《Python并发编程》(O'Reilly)
- 《深入理解计算机系统》(机械工业出版社)- 关于信号量的理论基础
- 在线课程:Python并发编程实战(慕课网)
九、下集预告
下一集我们将学习线程同步的另一种机制:事件(Event)。事件用于线程间的通信,可以让一个线程等待另一个线程发送信号。我们将学习事件的概念、工作原理和Python实现,并通过实际案例展示事件的应用。
敬请期待第115集:线程同步:事件!