第114集:线程同步:信号量

学习目标

  1. 理解信号量(Semaphore)的概念和工作原理
  2. 掌握信号量与锁(Lock)的区别和联系
  3. 学会使用Python中的Semaphore类实现线程同步
  4. 理解计数信号量和二进制信号量的概念
  5. 掌握信号量在实际项目中的应用场景
  6. 学会使用信号量解决资源限制和并发控制问题

一、信号量的基本概念

1.1 什么是信号量

信号量(Semaphore)是一种用于控制对共享资源访问的线程同步机制,它允许多个线程同时访问共享资源,但限制了同时访问的最大线程数量。

信号量维护了一个计数器,该计数器表示当前可用资源的数量。当线程需要访问资源时,它会尝试获取信号量(减少计数器);当线程完成资源使用时,它会释放信号量(增加计数器)。

1.2 信号量的工作原理

信号量的工作原理可以简单概括为:

  1. 初始化信号量时,设置一个初始计数器值
  2. 当线程需要访问资源时,调用acquire()方法:
    • 如果计数器大于0,计数器减1,线程继续执行
    • 如果计数器等于0,线程阻塞,等待其他线程释放资源
  3. 当线程完成资源使用时,调用release()方法:
    • 计数器加1
    • 如果有线程在等待,唤醒其中一个线程

1.3 信号量与锁的区别

特性 锁(Lock) 信号量(Semaphore)
资源访问数量 1个线程 多个线程(可配置)
资源类型 互斥资源 共享资源池
基本操作 acquire()/release() acquire()/release()
计数器初始值 1 可配置(≥0)
适用场景 保护临界区 限制并发访问数量

简单来说,锁是信号量的一种特殊情况(二进制信号量,计数器初始值为1)。

二、Python中的信号量实现

2.1 threading.Semaphore类

Python的threading模块提供了Semaphore类,用于实现信号量机制。

基本使用语法:

import threading

# 创建信号量,初始值为3
semaphore = threading.Semaphore(3)

# 获取信号量
semaphore.acquire()

# 访问共享资源
# ...

# 释放信号量
semaphore.release()

2.2 计数信号量

计数信号量(Counting Semaphore)允许指定数量的线程同时访问共享资源。例如,如果我们将信号量的初始值设置为3,那么最多只能有3个线程同时访问资源。

import threading
import time

# 创建信号量,最多允许3个线程同时访问
semaphore = threading.Semaphore(3)

# 工作函数
def worker(name):
    print(f"线程 {name} 尝试获取信号量")
    semaphore.acquire()
    try:
        print(f"线程 {name} 已获取信号量,开始工作")
        time.sleep(2)  # 模拟工作
        print(f"线程 {name} 工作完成")
    finally:
        semaphore.release()
        print(f"线程 {name} 已释放信号量")

# 创建5个线程
threads = []
for i in range(5):
    thread = threading.Thread(target=worker, args=(i,))
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

执行结果:

线程 0 尝试获取信号量
线程 0 已获取信号量,开始工作
线程 1 尝试获取信号量
线程 1 已获取信号量,开始工作
线程 2 尝试获取信号量
线程 2 已获取信号量,开始工作
线程 3 尝试获取信号量
线程 4 尝试获取信号量
线程 0 工作完成
线程 0 已释放信号量
线程 3 已获取信号量,开始工作
线程 1 工作完成
线程 1 已释放信号量
线程 4 已获取信号量,开始工作
线程 2 工作完成
线程 2 已释放信号量
线程 3 工作完成
线程 3 已释放信号量
线程 4 工作完成
线程 4 已释放信号量

2.3 二进制信号量

二进制信号量(Binary Semaphore)是计数信号量的一种特殊情况,其初始值为1。它的行为类似于锁,但有一些细微的区别:

  • 二进制信号量可以由一个线程获取,另一个线程释放
  • 锁必须由获取它的线程释放
import threading

# 创建二进制信号量(初始值为1)
binary_semaphore = threading.Semaphore(1)

# 使用方式与锁类似
binary_semaphore.acquire()
try:
    # 访问临界区
    pass
finally:
    binary_semaphore.release()

三、信号量的应用场景

3.1 资源池管理

信号量常用于管理资源池,如数据库连接池、线程池等。

示例:数据库连接池

import threading
import time
import random

class ConnectionPool:
    def __init__(self, size):
        self.size = size
        self.connections = [f"Connection-{i}" for i in range(size)]
        self.semaphore = threading.Semaphore(size)  # 信号量初始值等于连接池大小
        self.lock = threading.Lock()  # 保护连接列表的锁
    
    def get_connection(self):
        # 获取信号量(限制并发访问数量)
        self.semaphore.acquire()
        
        # 获取连接(需要锁保护)
        with self.lock:
            connection = self.connections.pop()
            print(f"获取连接: {connection}")
            return connection
    
    def release_connection(self, connection):
        # 释放连接(需要锁保护)
        with self.lock:
            self.connections.append(connection)
            print(f"释放连接: {connection}")
        
        # 释放信号量
        self.semaphore.release()

# 创建连接池(大小为3)
pool = ConnectionPool(3)

# 工作函数
def worker(name):
    # 获取连接
    connection = pool.get_connection()
    
    try:
        # 使用连接(模拟数据库操作)
        print(f"线程 {name} 使用连接 {connection} 进行数据库操作")
        time.sleep(random.uniform(1, 3))  # 模拟操作时间
    finally:
        # 释放连接
        pool.release_connection(connection)

# 创建8个线程,模拟并发访问
threads = []
for i in range(8):
    thread = threading.Thread(target=worker, args=(i,))
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

3.2 限制并发访问数量

信号量可以用于限制同时访问某个资源的线程数量,例如限制同时下载的文件数量、限制API请求的并发数等。

示例:限制并发下载数量

import threading
import time
import random

# 创建信号量,限制最多3个并发下载
download_semaphore = threading.Semaphore(3)

# 下载函数
def download_file(file_name):
    print(f"尝试下载文件: {file_name}")
    
    # 获取信号量(限制并发数)
    download_semaphore.acquire()
    
    try:
        print(f"开始下载文件: {file_name}")
        # 模拟下载过程
        download_time = random.uniform(2, 5)
        time.sleep(download_time)
        print(f"文件下载完成: {file_name},耗时 {download_time:.2f} 秒")
    finally:
        # 释放信号量
        download_semaphore.release()

# 要下载的文件列表
files = [
    "file1.mp4", "file2.mp4", "file3.mp4", "file4.mp4",
    "file5.mp4", "file6.mp4", "file7.mp4", "file8.mp4"
]

# 创建线程下载所有文件
threads = []
for file in files:
    thread = threading.Thread(target=download_file, args=(file,))
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

print("所有文件下载完成!")

3.3 生产者-消费者模式

信号量也可以用于实现生产者-消费者模式,控制生产者和消费者的速度。

示例:使用信号量实现生产者-消费者

import threading
import time
import random
from queue import Queue

# 共享队列
queue = Queue(maxsize=5)

# 信号量
empty = threading.Semaphore(5)  # 初始值为队列容量,表示空位置数量
full = threading.Semaphore(0)   # 初始值为0,表示满位置数量

# 生产者函数
def producer(name):
    for i in range(10):
        # 生产数据
        item = f"{name}-{i}"
        print(f"生产者 {name} 生产: {item}")
        
        # 获取空位置信号量
        empty.acquire()
        
        # 将数据放入队列
        queue.put(item)
        print(f"生产者 {name} 将 {item} 放入队列")
        
        # 释放满位置信号量
        full.release()
        
        # 模拟生产时间
        time.sleep(random.uniform(0.5, 1.5))

# 消费者函数
def consumer(name):
    for i in range(10):
        # 获取满位置信号量
        full.acquire()
        
        # 从队列取出数据
        item = queue.get()
        print(f"消费者 {name} 从队列取出: {item}")
        
        # 消费数据
        print(f"消费者 {name} 消费: {item}")
        
        # 释放空位置信号量
        empty.release()
        
        # 模拟消费时间
        time.sleep(random.uniform(0.5, 2.0))

# 创建生产者和消费者线程
producers = [
    threading.Thread(target=producer, args=("P1",)),
    threading.Thread(target=producer, args=("P2",))
]

consumers = [
    threading.Thread(target=consumer, args=("C1",)),
    threading.Thread(target=consumer, args=("C2",))
]

# 启动线程
for p in producers:
    p.start()

for c in consumers:
    c.start()

# 等待所有线程完成
for p in producers:
    p.join()

for c in consumers:
    c.join()

print("所有生产和消费完成!")

四、信号量的注意事项

4.1 信号量的初始值设置

信号量的初始值应该根据实际需求设置:

  • 如果是限制并发访问数量,初始值应设置为允许的最大并发数
  • 如果是资源池管理,初始值应设置为资源池的大小
  • 如果是二进制信号量,初始值应设置为1

4.2 信号量的获取和释放

  • 总是在try...finally块中使用信号量,确保无论是否发生异常,信号量都会被释放
  • 避免在信号量保护的临界区内执行耗时过长的操作,以免影响其他线程
  • 不要忘记释放信号量,否则会导致资源泄漏和死锁

4.3 信号量与其他同步机制的结合

信号量可以与其他线程同步机制(如锁、条件变量等)结合使用,以实现更复杂的同步需求。

五、信号量的常见应用案例

5.1 Web服务器连接限制

Web服务器使用信号量来限制同时处理的请求数量,防止服务器过载。

import threading
import time

# 限制最多同时处理5个请求
request_semaphore = threading.Semaphore(5)

def handle_request(request_id):
    print(f"收到请求: {request_id}")
    
    # 获取信号量
    request_semaphore.acquire()
    
    try:
        print(f"开始处理请求: {request_id}")
        # 模拟处理时间
        time.sleep(2)
        print(f"请求处理完成: {request_id}")
    finally:
        # 释放信号量
        request_semaphore.release()

# 模拟20个请求
for i in range(20):
    threading.Thread(target=handle_request, args=(i,)).start()
    time.sleep(0.5)  # 模拟请求间隔

5.2 数据库连接池

数据库连接池使用信号量来管理数据库连接的分配和回收。

import threading
import time

class DBPool:
    def __init__(self, size):
        self.size = size
        self.connections = [f"DB-Conn-{i}" for i in range(size)]
        self.semaphore = threading.Semaphore(size)
        self.lock = threading.Lock()
    
    def get_connection(self):
        self.semaphore.acquire()
        with self.lock:
            return self.connections.pop()
    
    def release_connection(self, conn):
        with self.lock:
            self.connections.append(conn)
        self.semaphore.release()

# 创建连接池(大小为3)
db_pool = DBPool(3)

def execute_query(thread_id):
    conn = db_pool.get_connection()
    print(f"线程 {thread_id} 使用连接 {conn} 执行查询")
    time.sleep(1)  # 模拟查询时间
    db_pool.release_connection(conn)
    print(f"线程 {thread_id} 释放连接 {conn}")

# 10个线程同时查询
for i in range(10):
    threading.Thread(target=execute_query, args=(i,)).start()

5.3 线程池实现

线程池使用信号量来控制活跃线程的数量。

import threading
import time
import queue

class ThreadPool:
    def __init__(self, size):
        self.size = size
        self.task_queue = queue.Queue()
        self.semaphore = threading.Semaphore(size)
        self.workers = []
        self.running = True
        
        # 创建工作线程
        for i in range(size):
            worker = threading.Thread(target=self.worker, args=(i,))
            self.workers.append(worker)
            worker.start()
    
    def worker(self, worker_id):
        while self.running:
            try:
                # 等待任务
                task, args, kwargs = self.task_queue.get(timeout=1)
                
                # 执行任务
                print(f"工作线程 {worker_id} 执行任务")
                task(*args, **kwargs)
                
                # 标记任务完成
                self.task_queue.task_done()
            except queue.Empty:
                continue
            except Exception as e:
                print(f"工作线程 {worker_id} 发生错误: {e}")
    
    def submit(self, task, *args, **kwargs):
        """提交任务到线程池"""
        self.task_queue.put((task, args, kwargs))
    
    def shutdown(self):
        """关闭线程池"""
        self.running = False
        for worker in self.workers:
            worker.join()

# 测试函数
def task(name):
    print(f"开始执行任务: {name}")
    time.sleep(1)  # 模拟任务执行时间
    print(f"任务执行完成: {name}")

# 创建线程池(大小为3)
pool = ThreadPool(3)

# 提交10个任务
for i in range(10):
    pool.submit(task, f"Task-{i}")

# 等待所有任务完成
pool.task_queue.join()

# 关闭线程池
pool.shutdown()

print("所有任务完成!")

六、总结

6.1 信号量的优缺点

优点:

  • 灵活控制并发访问数量
  • 适用于资源池管理
  • 可以实现复杂的同步模式
  • 支持多个线程同时访问资源

缺点:

  • 使用不当容易导致死锁
  • 比锁的实现更复杂
  • 计数器管理需要额外的开销

6.2 信号量的最佳实践

  1. 根据实际需求合理设置信号量的初始值
  2. 始终在try...finally块中使用信号量
  3. 避免在信号量保护的临界区内执行耗时操作
  4. 结合其他同步机制使用,实现更复杂的同步需求
  5. 定期检查信号量的使用情况,避免资源泄漏

七、课后练习

  1. 练习1:实现一个简单的连接池

    • 使用信号量实现一个容量为5的连接池
    • 连接池应支持获取连接和释放连接的操作
    • 创建10个线程同时使用连接池,验证连接池的正确性
  2. 练习2:限制并发下载数量

    • 使用信号量限制最多同时下载3个文件
    • 模拟下载10个文件,每个文件下载时间为1-3秒
    • 打印下载的开始和完成信息,验证并发限制的正确性
  3. 练习3:使用信号量实现生产者-消费者模式

    • 创建一个容量为10的队列
    • 实现2个生产者和3个消费者
    • 使用信号量控制生产者和消费者的速度
    • 生产者每秒生产1个数据,消费者每秒消费2个数据
  4. 练习4:Web服务器请求限制

    • 实现一个简单的Web服务器请求处理函数
    • 使用信号量限制最多同时处理10个请求
    • 模拟20个并发请求,验证请求限制的正确性

八、学习资源推荐

  1. Python官方文档:https://docs.python.org/3/library/threading.html#semaphore-objects
  2. 《Python并发编程》(O'Reilly)
  3. 《深入理解计算机系统》(机械工业出版社)- 关于信号量的理论基础
  4. 在线课程:Python并发编程实战(慕课网)

九、下集预告

下一集我们将学习线程同步的另一种机制:事件(Event)。事件用于线程间的通信,可以让一个线程等待另一个线程发送信号。我们将学习事件的概念、工作原理和Python实现,并通过实际案例展示事件的应用。

敬请期待第115集:线程同步:事件!

« 上一篇 线程同步:锁机制 下一篇 » 线程同步:事件