第116集_线程同步:条件变量
学习目标
- 理解条件变量的概念和作用
- 掌握Python中条件变量的基本使用方法
- 学会使用条件变量实现生产者-消费者模式
- 比较条件变量与其他同步机制的优缺点
- 能够在实际项目中正确应用条件变量进行线程同步
一、条件变量的概念
1. 什么是条件变量?
条件变量是线程同步的一种高级机制,它允许线程在满足特定条件之前等待,当条件满足时被唤醒继续执行。
2. 条件变量的作用
- 解决线程间的协作问题,尤其是生产者-消费者模式
- 避免忙等待(busy waiting),提高系统资源利用率
- 实现更复杂的线程同步逻辑
3. 条件变量与锁的关系
条件变量总是与锁配合使用:
- 锁用于保护共享数据的访问
- 条件变量用于线程间的条件等待和唤醒
二、Python中的条件变量
1. threading.Condition 类
Python的threading模块提供了Condition类来实现条件变量:
import threading
# 创建条件变量(默认使用RLock)
cond = threading.Condition()
# 也可以显式指定锁
lock = threading.Lock()
cond = threading.Condition(lock)2. 主要方法
| 方法 | 说明 |
|---|---|
acquire() |
获取内部锁 |
release() |
释放内部锁 |
wait(timeout=None) |
等待条件满足,释放锁并进入阻塞状态,直到被notify或超时 |
notify() |
唤醒一个等待该条件的线程 |
notify_all() |
唤醒所有等待该条件的线程 |
3. 使用步骤
- 获取条件变量的内部锁
- 检查条件是否满足
- 如果条件不满足,调用
wait()释放锁并等待 - 当条件满足时,执行相应操作
- 通知其他等待的线程
- 释放内部锁
三、条件变量的基本使用
1. 简单示例
import threading
import time
# 共享资源
data = []
# 创建条件变量
condition = threading.Condition()
# 生产者线程
def producer():
with condition:
# 生产数据
data.append("生产的数据")
print("生产者:已生产数据")
# 通知消费者
condition.notify()
# 消费者线程
def consumer():
with condition:
# 如果没有数据,等待
while not data:
print("消费者:等待数据...")
condition.wait()
# 消费数据
item = data.pop()
print(f"消费者:已消费数据:{item}")
# 创建线程
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
# 启动线程
c.start() # 先启动消费者,让它等待
time.sleep(1) # 等待1秒
p.start()
# 等待线程结束
p.join()
c.join()2. 带超时的等待
import threading
import time
condition = threading.Condition()
# 等待线程
def waiter():
with condition:
print("等待线程:开始等待")
# 等待5秒超时
result = condition.wait(timeout=5)
if result:
print("等待线程:被唤醒")
else:
print("等待线程:超时")
# 唤醒线程
def notifier():
time.sleep(3) # 等待3秒
with condition:
print("唤醒线程:通知等待线程")
condition.notify()
w = threading.Thread(target=waiter)
n = threading.Thread(target=notifier)
w.start()
n.start()
w.join()
n.join()四、生产者-消费者模式
1. 什么是生产者-消费者模式?
生产者-消费者模式是一种经典的线程同步模式:
- 生产者线程负责生成数据并放入缓冲区
- 消费者线程负责从缓冲区取出数据并处理
- 缓冲区是共享资源,需要同步访问
2. 使用条件变量实现
import threading
import time
import random
# 共享缓冲区
buffer = []
MAX_BUFFER_SIZE = 5
# 创建条件变量
condition = threading.Condition()
# 生产者类
class Producer(threading.Thread):
def run(self):
global buffer
while True:
with condition:
# 如果缓冲区满了,等待
while len(buffer) == MAX_BUFFER_SIZE:
print(f"生产者{self.name}:缓冲区已满,等待消费者消费...")
condition.wait()
# 生产数据
item = random.randint(1, 100)
buffer.append(item)
print(f"生产者{self.name}:生产了 {item},当前缓冲区:{buffer}")
# 通知消费者
condition.notify_all()
# 模拟生产时间
time.sleep(random.uniform(0.5, 1.5))
# 消费者类
class Consumer(threading.Thread):
def run(self):
global buffer
while True:
with condition:
# 如果缓冲区空了,等待
while len(buffer) == 0:
print(f"消费者{self.name}:缓冲区为空,等待生产者生产...")
condition.wait()
# 消费数据
item = buffer.pop(0) # 从队首取出
print(f"消费者{self.name}:消费了 {item},当前缓冲区:{buffer}")
# 通知生产者
condition.notify_all()
# 模拟消费时间
time.sleep(random.uniform(0.5, 2.0))
# 创建线程
producers = [Producer(name=f"P{i+1}") for i in range(3)]
consumers = [Consumer(name=f"C{i+1}") for i in range(2)]
# 启动线程
for p in producers:
p.start()
for c in consumers:
c.start()
# 主线程等待
for p in producers:
p.join()
for c in consumers:
c.join()五、条件变量与其他同步机制的比较
| 同步机制 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 锁(Lock/RLock) | 保护共享资源 | 简单易用 | 无法实现条件等待 |
| 信号量(Semaphore) | 资源计数 | 控制并发数 | 无法精确控制条件 |
| 事件(Event) | 线程通知 | 简单的通知机制 | 无法传递复杂条件 |
| 条件变量(Condition) | 复杂条件等待 | 灵活控制条件 | 使用相对复杂 |
六、实践练习
练习1:简单的条件变量使用
编写一个程序,创建两个线程:
- 线程A:生成1-10的数字,每次生成后通知线程B
- 线程B:接收线程A的通知,打印接收到的数字
练习2:带缓冲区的生产者-消费者
实现一个带有固定大小缓冲区的生产者-消费者模式:
- 生产者线程生成随机字符串
- 消费者线程计算字符串长度并打印
- 缓冲区大小为3
练习3:多条件的条件变量
实现一个更复杂的场景:
- 有三种类型的线程:生产者、奇数消费者、偶数消费者
- 生产者生产数字
- 奇数消费者只消费奇数
- 偶数消费者只消费偶数
七、总结
- 条件变量是一种高级线程同步机制,用于线程间的条件等待和唤醒
- 条件变量必须与锁配合使用,用于保护共享数据
- 主要方法包括
wait()、notify()和notify_all() - 条件变量特别适合实现生产者-消费者模式
- 与其他同步机制相比,条件变量提供了更灵活的条件控制
八、课后思考
- 为什么条件变量的
wait()方法需要在循环中调用? notify()和notify_all()有什么区别?分别在什么情况下使用?- 条件变量如何避免忙等待?
- 在实际项目中,什么时候应该使用条件变量而不是其他同步机制?