第113集:线程同步:锁机制

学习目标

  1. 理解线程同步的概念和必要性
  2. 掌握线程安全问题的产生原因和解决方案
  3. 学会使用Python中的Lock类实现线程同步
  4. 理解RLock(可重入锁)的概念和使用场景
  5. 掌握死锁的概念、产生条件和预防方法
  6. 学会在实际项目中合理使用锁机制

一、线程同步的概念

1.1 什么是线程同步

线程同步(Thread Synchronization)是指协调多个线程之间的执行顺序,确保它们按照预期的方式访问共享资源,避免数据不一致或竞态条件。

在多线程环境中,当多个线程同时访问共享资源时,可能会导致数据不一致的问题。线程同步机制就是用来解决这些问题的。

1.2 为什么需要线程同步

考虑以下情况:两个线程同时更新同一个银行账户的余额。

import threading
import time

# 银行账户类
class BankAccount:
    def __init__(self, balance):
        self.balance = balance
    
    def deposit(self, amount):
        """存款操作"""
        print(f"开始存款: {amount}")
        time.sleep(0.01)  # 模拟网络延迟
        self.balance += amount
        print(f"存款完成: {amount}, 当前余额: {self.balance}")
    
    def withdraw(self, amount):
        """取款操作"""
        print(f"开始取款: {amount}")
        time.sleep(0.01)  # 模拟网络延迟
        if self.balance >= amount:
            self.balance -= amount
            print(f"取款完成: {amount}, 当前余额: {self.balance}")
        else:
            print(f"余额不足,取款失败")

# 创建银行账户实例
account = BankAccount(1000)

# 定义存款和取款函数
def deposit_task():
    for _ in range(5):
        account.deposit(100)

def withdraw_task():
    for _ in range(5):
        account.withdraw(200)

# 创建并启动线程
deposit_thread = threading.Thread(target=deposit_task)
withdraw_thread = threading.Thread(target=withdraw_task)

deposit_thread.start()
withdraw_thread.start()

# 等待线程完成
deposit_thread.join()
withdraw_thread.join()

print(f"最终余额: {account.balance}")

执行结果(可能):

开始存款: 100
开始取款: 200
存款完成: 100, 当前余额: 1100
取款完成: 200, 当前余额: 900
开始存款: 100
开始取款: 200
存款完成: 100, 当前余额: 1000
取款完成: 200, 当前余额: 800
开始存款: 100
开始取款: 200
存款完成: 100, 当前余额: 900
取款完成: 200, 当前余额: 700
开始存款: 100
开始取款: 200
存款完成: 100, 当前余额: 800
取款完成: 200, 当前余额: 600
开始存款: 100
开始取款: 200
存款完成: 100, 当前余额: 700
取款完成: 200, 当前余额: 500
最终余额: 500

分析:

  • 初始余额:1000
  • 存款操作:5次 × 100 = 500
  • 取款操作:5次 × 200 = 1000
  • 预期最终余额:1000 + 500 - 1000 = 500

在这个例子中,最终余额是正确的,但这只是巧合。在更复杂的情况下,可能会出现数据不一致的问题。

二、线程安全问题

2.1 竞态条件

竞态条件(Race Condition)是指当多个线程同时访问共享资源时,最终结果取决于线程执行的时序。

考虑以下简单的计数器例子:

import threading

# 共享变量
counter = 0

def increment():
    global counter
    for _ in range(100000):
        counter += 1

# 创建两个线程
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)

# 启动线程
t1.start()
t2.start()

# 等待线程完成
t1.join()
t2.join()

print(f"最终计数器值: {counter}")

执行结果(可能):

最终计数器值: 156789

预期结果:

最终计数器值: 200000

问题分析:
counter += 1看似是一个原子操作,但实际上它包含三个步骤:

  1. 读取counter的当前值
  2. 将值加1
  3. 将新值写回counter

当两个线程同时执行这个操作时,可能会出现以下情况:

  • 线程1读取counter的值为100
  • 线程2也读取counter的值为100
  • 线程1将值加1得到101,并写回counter
  • 线程2也将值加1得到101,并写回counter

最终counter的值为101,而不是预期的102。这就是竞态条件导致的线程安全问题。

2.2 临界区

临界区(Critical Section)是指访问共享资源的代码段。在多线程环境中,临界区需要被保护,确保同一时间只有一个线程可以执行临界区代码。

在上面的例子中,counter += 1所在的循环体就是一个临界区。

三、锁机制

3.1 锁的概念

锁(Lock)是一种同步机制,用于保护临界区,确保同一时间只有一个线程可以访问共享资源。

锁的工作原理:

  1. 当线程需要访问共享资源时,首先尝试获取锁
  2. 如果锁可用(未被其他线程持有),线程获取锁并进入临界区
  3. 如果锁不可用(已被其他线程持有),线程会被阻塞,直到锁可用
  4. 当线程完成对共享资源的访问后,释放锁,允许其他线程获取锁

3.2 Python中的Lock类

Python的threading模块提供了Lock类,用于实现互斥锁(Mutual Exclusion Lock)。

基本使用方法:

import threading

# 创建锁对象
lock = threading.Lock()

# 获取锁
lock.acquire()
try:
    # 临界区代码
    # 访问共享资源
finally:
    # 释放锁
    lock.release()

更简洁的方式(推荐):

import threading

# 创建锁对象
lock = threading.Lock()

# 使用with语句自动获取和释放锁
with lock:
    # 临界区代码
    # 访问共享资源

解释:

  • with lock:语句确保在进入临界区前获取锁,退出时自动释放锁
  • 这种方式更安全,因为即使发生异常,锁也会被正确释放

3.3 使用锁解决计数器问题

import threading

# 共享变量
counter = 0

# 创建锁对象
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        with lock:  # 保护临界区
            counter += 1

# 创建两个线程
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)

# 启动线程
t1.start()
t2.start()

# 等待线程完成
t1.join()
t2.join()

print(f"最终计数器值: {counter}")

执行结果:

最终计数器值: 200000

分析:
通过使用锁,确保了counter += 1操作的原子性,解决了线程安全问题。

3.4 Lock类的方法

方法 描述
acquire(blocking=True, timeout=-1) 获取锁。如果blocking=True,则阻塞直到获取锁;如果timeout>0,则最多等待timeout秒;成功返回True,失败返回False
release() 释放锁。如果锁未被当前线程持有,会抛出RuntimeError
locked() 返回锁是否被持有。

示例:

import threading
import time

# 创建锁对象
lock = threading.Lock()

def worker():
    print(f"线程 {threading.current_thread().name} 尝试获取锁")
    # 尝试获取锁,最多等待2秒
    if lock.acquire(timeout=2):
        try:
            print(f"线程 {threading.current_thread().name} 获取到锁")
            time.sleep(3)  # 模拟长时间持有锁
        finally:
            print(f"线程 {threading.current_thread().name} 释放锁")
            lock.release()
    else:
        print(f"线程 {threading.current_thread().name} 超时,未能获取锁")

# 创建两个线程
t1 = threading.Thread(target=worker, name="Thread-1")
t2 = threading.Thread(target=worker, name="Thread-2")

# 启动线程
t1.start()
time.sleep(0.5)  # 确保线程1先尝试获取锁
t2.start()

# 等待线程完成
t1.join()
t2.join()

执行结果:

线程 Thread-1 尝试获取锁
线程 Thread-1 获取到锁
线程 Thread-2 尝试获取锁
线程 Thread-2 超时,未能获取锁
线程 Thread-1 释放锁

四、可重入锁(RLock)

4.1 RLock的概念

可重入锁(Reentrant Lock,RLock)是一种特殊的锁,允许同一线程多次获取锁而不会导致死锁。

当线程持有RLock时,可以再次获取该锁,而不会被阻塞。每次获取锁都需要对应一次释放锁。

4.2 RLock的使用

import threading

# 创建RLock对象
rlock = threading.RLock()

# 同一线程多次获取锁
def worker():
    print(f"线程 {threading.current_thread().name} 获取锁第一次")
    with rlock:
        print(f"线程 {threading.current_thread().name} 正在执行任务1")
        time.sleep(1)
        
        print(f"线程 {threading.current_thread().name} 尝试获取锁第二次")
        with rlock:
            print(f"线程 {threading.current_thread().name} 正在执行任务2")
            time.sleep(1)
        
        print(f"线程 {threading.current_thread().name} 任务2完成,释放锁第二次")
    print(f"线程 {threading.current_thread().name} 任务1完成,释放锁第一次")

# 创建并启动线程
t = threading.Thread(target=worker)
t.start()

# 等待线程完成
t.join()

执行结果:

线程 Thread-1 获取锁第一次
线程 Thread-1 正在执行任务1
线程 Thread-1 尝试获取锁第二次
线程 Thread-1 正在执行任务2
线程 Thread-1 任务2完成,释放锁第二次
线程 Thread-1 任务1完成,释放锁第一次

4.3 Lock与RLock的区别

特性 Lock RLock
同一线程多次获取 会死锁 不会死锁
性能 稍高 稍低
适用场景 简单临界区 嵌套临界区
实现复杂度 较低 较高

使用建议:

  • 如果临界区没有嵌套,使用Lock更高效
  • 如果临界区有嵌套,必须使用RLock避免死锁

五、死锁问题

5.1 死锁的概念

死锁(Deadlock)是指两个或多个线程相互等待对方释放锁,导致所有线程都被阻塞,无法继续执行。

5.2 死锁的产生条件

死锁的产生需要同时满足以下四个条件:

  1. 互斥条件:资源不能被多个线程同时占用
  2. 请求与保持条件:线程持有至少一个资源,同时请求获取其他线程持有的资源
  3. 不剥夺条件:线程已获取的资源不能被强制剥夺
  4. 循环等待条件:存在一个线程等待链,每个线程都在等待下一个线程持有的资源

5.3 死锁示例

import threading
import time

# 创建两个锁对象
lock1 = threading.Lock()
lock2 = threading.Lock()

def worker1():
    print("线程1: 尝试获取锁1")
    with lock1:
        print("线程1: 获取到锁1")
        time.sleep(1)  # 模拟处理时间
        
        print("线程1: 尝试获取锁2")
        with lock2:
            print("线程1: 获取到锁2")
            # 处理任务
    print("线程1: 完成任务")

def worker2():
    print("线程2: 尝试获取锁2")
    with lock2:
        print("线程2: 获取到锁2")
        time.sleep(1)  # 模拟处理时间
        
        print("线程2: 尝试获取锁1")
        with lock1:
            print("线程2: 获取到锁1")
            # 处理任务
    print("线程2: 完成任务")

# 创建并启动线程
t1 = threading.Thread(target=worker1)
t2 = threading.Thread(target=worker2)

t1.start()
t2.start()

# 等待线程完成
t1.join()
t2.join()

print("所有线程完成")

执行结果:

线程1: 尝试获取锁1
线程1: 获取到锁1
线程2: 尝试获取锁2
线程2: 获取到锁2
线程1: 尝试获取锁2
线程2: 尝试获取锁1

问题分析:

  • 线程1持有锁1,等待获取锁2
  • 线程2持有锁2,等待获取锁1
  • 两个线程相互等待,形成死锁

5.4 死锁的预防方法

  1. 破坏互斥条件:允许资源被多个线程同时访问,但这在大多数情况下不可行

  2. 破坏请求与保持条件

    • 线程在请求资源时,必须释放已持有的所有资源
    • 或在线程开始执行前,一次性获取所有需要的资源
  3. 破坏不剥夺条件

    • 允许线程在等待资源超时后,释放已持有的资源
    • 或允许系统剥夺线程持有的资源
  4. 破坏循环等待条件

    • 为资源分配唯一的序号
    • 线程必须按照序号递增的顺序请求资源

5.5 解决死锁的策略

  1. 避免策略

    • 使用acquire(timeout)设置超时时间
    • 如果超时,释放已持有的资源并重试
  2. 检测与恢复策略

    • 定期检测是否存在死锁
    • 如果检测到死锁,采取措施恢复(如终止某些线程)
  3. 预防策略

    • 按照固定的顺序请求资源
    • 避免嵌套锁
    • 使用超时机制

示例:使用超时机制避免死锁

import threading
import time

# 创建两个锁对象
lock1 = threading.Lock()
lock2 = threading.Lock()

def worker1():
    while True:
        print("线程1: 尝试获取锁1")
        if lock1.acquire(timeout=2):
            try:
                print("线程1: 获取到锁1")
                time.sleep(1)
                
                print("线程1: 尝试获取锁2")
                if lock2.acquire(timeout=2):
                    try:
                        print("线程1: 获取到锁2")
                        print("线程1: 完成任务")
                        return  # 任务完成,退出线程
                    finally:
                        lock2.release()
                        print("线程1: 释放锁2")
                else:
                    print("线程1: 未能获取锁2,重试")
            finally:
                lock1.release()
                print("线程1: 释放锁1")
        time.sleep(1)  # 等待一段时间后重试

def worker2():
    while True:
        print("线程2: 尝试获取锁2")
        if lock2.acquire(timeout=2):
            try:
                print("线程2: 获取到锁2")
                time.sleep(1)
                
                print("线程2: 尝试获取锁1")
                if lock1.acquire(timeout=2):
                    try:
                        print("线程2: 获取到锁1")
                        print("线程2: 完成任务")
                        return  # 任务完成,退出线程
                    finally:
                        lock1.release()
                        print("线程2: 释放锁1")
                else:
                    print("线程2: 未能获取锁1,重试")
            finally:
                lock2.release()
                print("线程2: 释放锁2")
        time.sleep(1)  # 等待一段时间后重试

# 创建并启动线程
t1 = threading.Thread(target=worker1)
t2 = threading.Thread(target=worker2)

t1.start()
t2.start()

# 等待线程完成
t1.join()
t2.join()

print("所有线程完成")

执行结果(可能):

线程1: 尝试获取锁1
线程1: 获取到锁1
线程2: 尝试获取锁2
线程2: 获取到锁2
线程1: 尝试获取锁2
线程2: 尝试获取锁1
线程1: 未能获取锁2,重试
线程1: 释放锁1
线程2: 获取到锁1
线程2: 完成任务
线程2: 释放锁1
线程2: 释放锁2
线程1: 尝试获取锁1
线程1: 获取到锁1
线程1: 尝试获取锁2
线程1: 获取到锁2
线程1: 完成任务
线程1: 释放锁2
线程1: 释放锁1
所有线程完成

分析:
通过设置超时机制,当线程无法获取所需资源时,会释放已持有的资源并重试,从而避免了死锁。

六、锁的实际应用

6.1 线程安全的计数器

import threading

class ThreadSafeCounter:
    def __init__(self, initial_value=0):
        self.value = initial_value
        self.lock = threading.Lock()
    
    def increment(self, delta=1):
        with self.lock:
            self.value += delta
            return self.value
    
    def decrement(self, delta=1):
        with self.lock:
            self.value -= delta
            return self.value
    
    def get_value(self):
        with self.lock:
            return self.value

# 使用线程安全的计数器
counter = ThreadSafeCounter(0)

def worker():
    for _ in range(100000):
        counter.increment()

# 创建两个线程
t1 = threading.Thread(target=worker)
t2 = threading.Thread(target=worker)

# 启动线程
t1.start()
t2.start()

# 等待线程完成
t1.join()
t2.join()

print(f"最终计数器值: {counter.get_value()}")  # 200000

6.2 线程安全的队列

import threading

class ThreadSafeQueue:
    def __init__(self):
        self.queue = []
        self.lock = threading.Lock()
    
    def put(self, item):
        with self.lock:
            self.queue.append(item)
    
    def get(self):
        with self.lock:
            if not self.queue:
                return None
            return self.queue.pop(0)
    
    def size(self):
        with self.lock:
            return len(self.queue)

# 使用线程安全的队列
queue = ThreadSafeQueue()

def producer():
    for i in range(10):
        queue.put(f"item-{i}")
        print(f"生产者: 放入 {f'item-{i}'}")

def consumer():
    for _ in range(10):
        item = queue.get()
        print(f"消费者: 取出 {item}")

# 创建并启动线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

# 等待线程完成
producer_thread.join()
consumer_thread.join()

七、总结与练习

7.1 总结

  1. 线程同步:协调多个线程之间的执行顺序,确保共享资源的正确访问
  2. 线程安全问题:由竞态条件导致的数据不一致问题
  3. 锁机制:保护临界区,确保同一时间只有一个线程可以访问共享资源
  4. Lock类:基本的互斥锁,适用于简单临界区
  5. RLock类:可重入锁,适用于嵌套临界区
  6. 死锁:两个或多个线程相互等待对方释放锁的状态
  7. 死锁预防:破坏死锁产生的四个条件之一

7.2 练习

  1. 基础练习

    • 编写一个线程安全的银行账户类,实现存款和取款功能
    • 使用Lock类解决计数器的线程安全问题
  2. 进阶练习

    • 实现一个线程安全的栈数据结构
    • 使用RLock类解决嵌套临界区的问题
    • 编写一个程序,演示如何使用超时机制避免死锁
  3. 思考问题

    • 什么是线程安全?如何确保线程安全?
    • Lock和RLock的区别是什么?分别适用于什么场景?
    • 死锁产生的条件是什么?如何预防死锁?

八、扩展阅读

  1. Python官方文档:

  2. 推荐书籍:

    • 《Python Cookbook》(第3版)第12章:并发编程
    • 《Fluent Python》第17章:并发执行
  3. 在线资源:


下集预告:第114集将学习线程同步的另一种机制——信号量,包括信号量的概念、使用方法和应用场景。

« 上一篇 线程创建与启动 下一篇 » 线程同步:信号量