第113集:线程同步:锁机制
学习目标
- 理解线程同步的概念和必要性
- 掌握线程安全问题的产生原因和解决方案
- 学会使用Python中的Lock类实现线程同步
- 理解RLock(可重入锁)的概念和使用场景
- 掌握死锁的概念、产生条件和预防方法
- 学会在实际项目中合理使用锁机制
一、线程同步的概念
1.1 什么是线程同步
线程同步(Thread Synchronization)是指协调多个线程之间的执行顺序,确保它们按照预期的方式访问共享资源,避免数据不一致或竞态条件。
在多线程环境中,当多个线程同时访问共享资源时,可能会导致数据不一致的问题。线程同步机制就是用来解决这些问题的。
1.2 为什么需要线程同步
考虑以下情况:两个线程同时更新同一个银行账户的余额。
import threading
import time
# 银行账户类
class BankAccount:
def __init__(self, balance):
self.balance = balance
def deposit(self, amount):
"""存款操作"""
print(f"开始存款: {amount}")
time.sleep(0.01) # 模拟网络延迟
self.balance += amount
print(f"存款完成: {amount}, 当前余额: {self.balance}")
def withdraw(self, amount):
"""取款操作"""
print(f"开始取款: {amount}")
time.sleep(0.01) # 模拟网络延迟
if self.balance >= amount:
self.balance -= amount
print(f"取款完成: {amount}, 当前余额: {self.balance}")
else:
print(f"余额不足,取款失败")
# 创建银行账户实例
account = BankAccount(1000)
# 定义存款和取款函数
def deposit_task():
for _ in range(5):
account.deposit(100)
def withdraw_task():
for _ in range(5):
account.withdraw(200)
# 创建并启动线程
deposit_thread = threading.Thread(target=deposit_task)
withdraw_thread = threading.Thread(target=withdraw_task)
deposit_thread.start()
withdraw_thread.start()
# 等待线程完成
deposit_thread.join()
withdraw_thread.join()
print(f"最终余额: {account.balance}")执行结果(可能):
开始存款: 100
开始取款: 200
存款完成: 100, 当前余额: 1100
取款完成: 200, 当前余额: 900
开始存款: 100
开始取款: 200
存款完成: 100, 当前余额: 1000
取款完成: 200, 当前余额: 800
开始存款: 100
开始取款: 200
存款完成: 100, 当前余额: 900
取款完成: 200, 当前余额: 700
开始存款: 100
开始取款: 200
存款完成: 100, 当前余额: 800
取款完成: 200, 当前余额: 600
开始存款: 100
开始取款: 200
存款完成: 100, 当前余额: 700
取款完成: 200, 当前余额: 500
最终余额: 500分析:
- 初始余额:1000
- 存款操作:5次 × 100 = 500
- 取款操作:5次 × 200 = 1000
- 预期最终余额:1000 + 500 - 1000 = 500
在这个例子中,最终余额是正确的,但这只是巧合。在更复杂的情况下,可能会出现数据不一致的问题。
二、线程安全问题
2.1 竞态条件
竞态条件(Race Condition)是指当多个线程同时访问共享资源时,最终结果取决于线程执行的时序。
考虑以下简单的计数器例子:
import threading
# 共享变量
counter = 0
def increment():
global counter
for _ in range(100000):
counter += 1
# 创建两个线程
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
# 启动线程
t1.start()
t2.start()
# 等待线程完成
t1.join()
t2.join()
print(f"最终计数器值: {counter}")执行结果(可能):
最终计数器值: 156789预期结果:
最终计数器值: 200000问题分析:counter += 1看似是一个原子操作,但实际上它包含三个步骤:
- 读取
counter的当前值 - 将值加1
- 将新值写回
counter
当两个线程同时执行这个操作时,可能会出现以下情况:
- 线程1读取
counter的值为100 - 线程2也读取
counter的值为100 - 线程1将值加1得到101,并写回
counter - 线程2也将值加1得到101,并写回
counter
最终counter的值为101,而不是预期的102。这就是竞态条件导致的线程安全问题。
2.2 临界区
临界区(Critical Section)是指访问共享资源的代码段。在多线程环境中,临界区需要被保护,确保同一时间只有一个线程可以执行临界区代码。
在上面的例子中,counter += 1所在的循环体就是一个临界区。
三、锁机制
3.1 锁的概念
锁(Lock)是一种同步机制,用于保护临界区,确保同一时间只有一个线程可以访问共享资源。
锁的工作原理:
- 当线程需要访问共享资源时,首先尝试获取锁
- 如果锁可用(未被其他线程持有),线程获取锁并进入临界区
- 如果锁不可用(已被其他线程持有),线程会被阻塞,直到锁可用
- 当线程完成对共享资源的访问后,释放锁,允许其他线程获取锁
3.2 Python中的Lock类
Python的threading模块提供了Lock类,用于实现互斥锁(Mutual Exclusion Lock)。
基本使用方法:
import threading
# 创建锁对象
lock = threading.Lock()
# 获取锁
lock.acquire()
try:
# 临界区代码
# 访问共享资源
finally:
# 释放锁
lock.release()更简洁的方式(推荐):
import threading
# 创建锁对象
lock = threading.Lock()
# 使用with语句自动获取和释放锁
with lock:
# 临界区代码
# 访问共享资源解释:
with lock:语句确保在进入临界区前获取锁,退出时自动释放锁- 这种方式更安全,因为即使发生异常,锁也会被正确释放
3.3 使用锁解决计数器问题
import threading
# 共享变量
counter = 0
# 创建锁对象
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock: # 保护临界区
counter += 1
# 创建两个线程
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
# 启动线程
t1.start()
t2.start()
# 等待线程完成
t1.join()
t2.join()
print(f"最终计数器值: {counter}")执行结果:
最终计数器值: 200000分析:
通过使用锁,确保了counter += 1操作的原子性,解决了线程安全问题。
3.4 Lock类的方法
| 方法 | 描述 |
|---|---|
acquire(blocking=True, timeout=-1) |
获取锁。如果blocking=True,则阻塞直到获取锁;如果timeout>0,则最多等待timeout秒;成功返回True,失败返回False。 |
release() |
释放锁。如果锁未被当前线程持有,会抛出RuntimeError。 |
locked() |
返回锁是否被持有。 |
示例:
import threading
import time
# 创建锁对象
lock = threading.Lock()
def worker():
print(f"线程 {threading.current_thread().name} 尝试获取锁")
# 尝试获取锁,最多等待2秒
if lock.acquire(timeout=2):
try:
print(f"线程 {threading.current_thread().name} 获取到锁")
time.sleep(3) # 模拟长时间持有锁
finally:
print(f"线程 {threading.current_thread().name} 释放锁")
lock.release()
else:
print(f"线程 {threading.current_thread().name} 超时,未能获取锁")
# 创建两个线程
t1 = threading.Thread(target=worker, name="Thread-1")
t2 = threading.Thread(target=worker, name="Thread-2")
# 启动线程
t1.start()
time.sleep(0.5) # 确保线程1先尝试获取锁
t2.start()
# 等待线程完成
t1.join()
t2.join()执行结果:
线程 Thread-1 尝试获取锁
线程 Thread-1 获取到锁
线程 Thread-2 尝试获取锁
线程 Thread-2 超时,未能获取锁
线程 Thread-1 释放锁四、可重入锁(RLock)
4.1 RLock的概念
可重入锁(Reentrant Lock,RLock)是一种特殊的锁,允许同一线程多次获取锁而不会导致死锁。
当线程持有RLock时,可以再次获取该锁,而不会被阻塞。每次获取锁都需要对应一次释放锁。
4.2 RLock的使用
import threading
# 创建RLock对象
rlock = threading.RLock()
# 同一线程多次获取锁
def worker():
print(f"线程 {threading.current_thread().name} 获取锁第一次")
with rlock:
print(f"线程 {threading.current_thread().name} 正在执行任务1")
time.sleep(1)
print(f"线程 {threading.current_thread().name} 尝试获取锁第二次")
with rlock:
print(f"线程 {threading.current_thread().name} 正在执行任务2")
time.sleep(1)
print(f"线程 {threading.current_thread().name} 任务2完成,释放锁第二次")
print(f"线程 {threading.current_thread().name} 任务1完成,释放锁第一次")
# 创建并启动线程
t = threading.Thread(target=worker)
t.start()
# 等待线程完成
t.join()执行结果:
线程 Thread-1 获取锁第一次
线程 Thread-1 正在执行任务1
线程 Thread-1 尝试获取锁第二次
线程 Thread-1 正在执行任务2
线程 Thread-1 任务2完成,释放锁第二次
线程 Thread-1 任务1完成,释放锁第一次4.3 Lock与RLock的区别
| 特性 | Lock | RLock |
|---|---|---|
| 同一线程多次获取 | 会死锁 | 不会死锁 |
| 性能 | 稍高 | 稍低 |
| 适用场景 | 简单临界区 | 嵌套临界区 |
| 实现复杂度 | 较低 | 较高 |
使用建议:
- 如果临界区没有嵌套,使用
Lock更高效 - 如果临界区有嵌套,必须使用
RLock避免死锁
五、死锁问题
5.1 死锁的概念
死锁(Deadlock)是指两个或多个线程相互等待对方释放锁,导致所有线程都被阻塞,无法继续执行。
5.2 死锁的产生条件
死锁的产生需要同时满足以下四个条件:
- 互斥条件:资源不能被多个线程同时占用
- 请求与保持条件:线程持有至少一个资源,同时请求获取其他线程持有的资源
- 不剥夺条件:线程已获取的资源不能被强制剥夺
- 循环等待条件:存在一个线程等待链,每个线程都在等待下一个线程持有的资源
5.3 死锁示例
import threading
import time
# 创建两个锁对象
lock1 = threading.Lock()
lock2 = threading.Lock()
def worker1():
print("线程1: 尝试获取锁1")
with lock1:
print("线程1: 获取到锁1")
time.sleep(1) # 模拟处理时间
print("线程1: 尝试获取锁2")
with lock2:
print("线程1: 获取到锁2")
# 处理任务
print("线程1: 完成任务")
def worker2():
print("线程2: 尝试获取锁2")
with lock2:
print("线程2: 获取到锁2")
time.sleep(1) # 模拟处理时间
print("线程2: 尝试获取锁1")
with lock1:
print("线程2: 获取到锁1")
# 处理任务
print("线程2: 完成任务")
# 创建并启动线程
t1 = threading.Thread(target=worker1)
t2 = threading.Thread(target=worker2)
t1.start()
t2.start()
# 等待线程完成
t1.join()
t2.join()
print("所有线程完成")执行结果:
线程1: 尝试获取锁1
线程1: 获取到锁1
线程2: 尝试获取锁2
线程2: 获取到锁2
线程1: 尝试获取锁2
线程2: 尝试获取锁1问题分析:
- 线程1持有锁1,等待获取锁2
- 线程2持有锁2,等待获取锁1
- 两个线程相互等待,形成死锁
5.4 死锁的预防方法
破坏互斥条件:允许资源被多个线程同时访问,但这在大多数情况下不可行
破坏请求与保持条件:
- 线程在请求资源时,必须释放已持有的所有资源
- 或在线程开始执行前,一次性获取所有需要的资源
破坏不剥夺条件:
- 允许线程在等待资源超时后,释放已持有的资源
- 或允许系统剥夺线程持有的资源
破坏循环等待条件:
- 为资源分配唯一的序号
- 线程必须按照序号递增的顺序请求资源
5.5 解决死锁的策略
避免策略:
- 使用
acquire(timeout)设置超时时间 - 如果超时,释放已持有的资源并重试
- 使用
检测与恢复策略:
- 定期检测是否存在死锁
- 如果检测到死锁,采取措施恢复(如终止某些线程)
预防策略:
- 按照固定的顺序请求资源
- 避免嵌套锁
- 使用超时机制
示例:使用超时机制避免死锁
import threading
import time
# 创建两个锁对象
lock1 = threading.Lock()
lock2 = threading.Lock()
def worker1():
while True:
print("线程1: 尝试获取锁1")
if lock1.acquire(timeout=2):
try:
print("线程1: 获取到锁1")
time.sleep(1)
print("线程1: 尝试获取锁2")
if lock2.acquire(timeout=2):
try:
print("线程1: 获取到锁2")
print("线程1: 完成任务")
return # 任务完成,退出线程
finally:
lock2.release()
print("线程1: 释放锁2")
else:
print("线程1: 未能获取锁2,重试")
finally:
lock1.release()
print("线程1: 释放锁1")
time.sleep(1) # 等待一段时间后重试
def worker2():
while True:
print("线程2: 尝试获取锁2")
if lock2.acquire(timeout=2):
try:
print("线程2: 获取到锁2")
time.sleep(1)
print("线程2: 尝试获取锁1")
if lock1.acquire(timeout=2):
try:
print("线程2: 获取到锁1")
print("线程2: 完成任务")
return # 任务完成,退出线程
finally:
lock1.release()
print("线程2: 释放锁1")
else:
print("线程2: 未能获取锁1,重试")
finally:
lock2.release()
print("线程2: 释放锁2")
time.sleep(1) # 等待一段时间后重试
# 创建并启动线程
t1 = threading.Thread(target=worker1)
t2 = threading.Thread(target=worker2)
t1.start()
t2.start()
# 等待线程完成
t1.join()
t2.join()
print("所有线程完成")执行结果(可能):
线程1: 尝试获取锁1
线程1: 获取到锁1
线程2: 尝试获取锁2
线程2: 获取到锁2
线程1: 尝试获取锁2
线程2: 尝试获取锁1
线程1: 未能获取锁2,重试
线程1: 释放锁1
线程2: 获取到锁1
线程2: 完成任务
线程2: 释放锁1
线程2: 释放锁2
线程1: 尝试获取锁1
线程1: 获取到锁1
线程1: 尝试获取锁2
线程1: 获取到锁2
线程1: 完成任务
线程1: 释放锁2
线程1: 释放锁1
所有线程完成分析:
通过设置超时机制,当线程无法获取所需资源时,会释放已持有的资源并重试,从而避免了死锁。
六、锁的实际应用
6.1 线程安全的计数器
import threading
class ThreadSafeCounter:
def __init__(self, initial_value=0):
self.value = initial_value
self.lock = threading.Lock()
def increment(self, delta=1):
with self.lock:
self.value += delta
return self.value
def decrement(self, delta=1):
with self.lock:
self.value -= delta
return self.value
def get_value(self):
with self.lock:
return self.value
# 使用线程安全的计数器
counter = ThreadSafeCounter(0)
def worker():
for _ in range(100000):
counter.increment()
# 创建两个线程
t1 = threading.Thread(target=worker)
t2 = threading.Thread(target=worker)
# 启动线程
t1.start()
t2.start()
# 等待线程完成
t1.join()
t2.join()
print(f"最终计数器值: {counter.get_value()}") # 2000006.2 线程安全的队列
import threading
class ThreadSafeQueue:
def __init__(self):
self.queue = []
self.lock = threading.Lock()
def put(self, item):
with self.lock:
self.queue.append(item)
def get(self):
with self.lock:
if not self.queue:
return None
return self.queue.pop(0)
def size(self):
with self.lock:
return len(self.queue)
# 使用线程安全的队列
queue = ThreadSafeQueue()
def producer():
for i in range(10):
queue.put(f"item-{i}")
print(f"生产者: 放入 {f'item-{i}'}")
def consumer():
for _ in range(10):
item = queue.get()
print(f"消费者: 取出 {item}")
# 创建并启动线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
# 等待线程完成
producer_thread.join()
consumer_thread.join()七、总结与练习
7.1 总结
- 线程同步:协调多个线程之间的执行顺序,确保共享资源的正确访问
- 线程安全问题:由竞态条件导致的数据不一致问题
- 锁机制:保护临界区,确保同一时间只有一个线程可以访问共享资源
- Lock类:基本的互斥锁,适用于简单临界区
- RLock类:可重入锁,适用于嵌套临界区
- 死锁:两个或多个线程相互等待对方释放锁的状态
- 死锁预防:破坏死锁产生的四个条件之一
7.2 练习
基础练习:
- 编写一个线程安全的银行账户类,实现存款和取款功能
- 使用Lock类解决计数器的线程安全问题
进阶练习:
- 实现一个线程安全的栈数据结构
- 使用RLock类解决嵌套临界区的问题
- 编写一个程序,演示如何使用超时机制避免死锁
思考问题:
- 什么是线程安全?如何确保线程安全?
- Lock和RLock的区别是什么?分别适用于什么场景?
- 死锁产生的条件是什么?如何预防死锁?
八、扩展阅读
Python官方文档:
- threading模块:https://docs.python.org/3/library/threading.html
推荐书籍:
- 《Python Cookbook》(第3版)第12章:并发编程
- 《Fluent Python》第17章:并发执行
在线资源:
下集预告:第114集将学习线程同步的另一种机制——信号量,包括信号量的概念、使用方法和应用场景。