Redis分布式锁实现

分布式锁概述

在分布式系统中,多个进程可能同时访问共享资源,为了保证数据一致性和操作的原子性,需要一种机制来协调这些进程的访问,这就是分布式锁的作用。

分布式锁的基本要求

  1. 互斥性:在任何时刻,只有一个进程可以持有锁
  2. 无死锁:即使持有锁的进程崩溃,锁也能被释放
  3. 容错性:只要大部分Redis节点正常工作,锁机制就能正常运行
  4. 公平性:(可选)按照请求顺序获取锁
  5. 可重入性:(可选)同一个进程可以多次获取同一把锁

传统锁与分布式锁的区别

特性 传统锁(单机) 分布式锁
作用范围 单个进程内 跨多个进程、多个机器
实现方式 内存锁(如Java的synchronized) 基于共享存储(如Redis、ZooKeeper)
复杂性 简单 复杂(需要考虑网络延迟、节点故障等)
可靠性 取决于实现方式和底层存储的可靠性

Redis实现分布式锁的原理

Redis实现分布式锁的核心思想是利用Redis的原子操作来确保只有一个进程能成功获取锁。

基本实现思路

  1. 获取锁:使用SET命令(带NX选项)尝试设置一个键,成功表示获取锁
  2. 释放锁:使用DEL命令删除锁键
  3. 避免死锁:给锁设置过期时间,确保即使进程崩溃也能自动释放锁

基本实现

1. 简单实现

使用Redis的SET命令(带NX和EX选项)实现基本的分布式锁。

import redis
import time
import uuid

# 连接Redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)

def acquire_lock(lock_name, expire_time=10):
    """获取分布式锁"""
    # 生成唯一标识符,用于释放锁时的身份验证
    identifier = str(uuid.uuid4())
    # 锁的键名
    lock_key = f"lock:{lock_name}"
    
    # 使用SET命令尝试获取锁
    # NX: 只有当键不存在时才设置
    # EX: 设置过期时间(秒)
    result = redis_client.set(lock_key, identifier, nx=True, ex=expire_time)
    
    if result:
        return identifier
    return None

def release_lock(lock_name, identifier):
    """释放分布式锁"""
    lock_key = f"lock:{lock_name}"
    
    # 获取锁的当前值
    current_identifier = redis_client.get(lock_key)
    
    # 验证锁的持有者是否是当前进程
    if current_identifier and current_identifier.decode('utf-8') == identifier:
        # 释放锁
        redis_client.delete(lock_key)
        return True
    return False

# 使用示例
def process_shared_resource():
    """处理共享资源"""
    # 尝试获取锁,过期时间10秒
    lock_id = acquire_lock('shared_resource', 10)
    
    if lock_id:
        try:
            print("获取锁成功,开始处理共享资源...")
            # 模拟处理共享资源的操作
            time.sleep(5)
            print("共享资源处理完成")
        finally:
            # 释放锁
            release_lock('shared_resource', lock_id)
            print("锁已释放")
    else:
        print("获取锁失败,资源忙")

2. 使用Lua脚本释放锁

上述实现存在一个问题:在获取锁值和删除锁之间可能存在竞态条件。使用Lua脚本可以确保释放锁操作的原子性。

def release_lock_safely(lock_name, identifier):
    """安全释放分布式锁"""
    lock_key = f"lock:{lock_name}"
    
    # Lua脚本,确保获取锁值和删除锁的原子性
    script = """
    if redis.call('get', KEYS[1]) == ARGV[1] then
        return redis.call('del', KEYS[1])
    else
        return 0
    end
    """
    
    # 执行Lua脚本
    result = redis_client.eval(script, 1, lock_key, identifier)
    return result == 1

# 更新释放锁函数
def release_lock(lock_name, identifier):
    return release_lock_safely(lock_name, identifier)

高级实现

1. 可重入锁

可重入锁允许同一个进程多次获取同一把锁,避免了死锁的风险。

class RedisReentrantLock:
    """Redis可重入锁"""
    def __init__(self, redis_client, lock_name, expire_time=10):
        self.redis_client = redis_client
        self.lock_name = lock_name
        self.expire_time = expire_time
        self.lock_key = f"lock:{lock_name}"
        # 线程本地存储,记录当前线程的锁获取次数
        import threading
        self.local = threading.local()
    
    def acquire(self):
        """获取锁"""
        # 生成唯一标识符
        identifier = str(uuid.uuid4())
        
        # 检查本地存储中是否已有锁信息
        if hasattr(self.local, 'lock_count') and hasattr(self.local, 'identifier'):
            # 已获取过锁,增加计数
            self.local.lock_count += 1
            # 刷新锁的过期时间
            self.redis_client.expire(self.lock_key, self.expire_time)
            return True
        
        # 尝试获取锁
        result = self.redis_client.set(self.lock_key, identifier, nx=True, ex=self.expire_time)
        
        if result:
            # 获取锁成功,记录锁信息到本地存储
            self.local.lock_count = 1
            self.local.identifier = identifier
            return True
        return False
    
    def release(self):
        """释放锁"""
        # 检查本地存储中是否有锁信息
        if not (hasattr(self.local, 'lock_count') and hasattr(self.local, 'identifier')):
            return False
        
        # 减少计数
        self.local.lock_count -= 1
        
        if self.local.lock_count <= 0:
            # 计数为0,真正释放锁
            identifier = self.local.identifier
            # 清理本地存储
            delattr(self.local, 'lock_count')
            delattr(self.local, 'identifier')
            # 使用Lua脚本安全释放锁
            script = """
            if redis.call('get', KEYS[1]) == ARGV[1] then
                return redis.call('del', KEYS[1])
            else
                return 0
            end
            """
            result = self.redis_client.eval(script, 1, self.lock_key, identifier)
            return result == 1
        else:
            # 计数不为0,只刷新过期时间
            self.redis_client.expire(self.lock_key, self.expire_time)
            return True
    
    def __enter__(self):
        """支持with语句"""
        self.acquire()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """支持with语句"""
        self.release()

# 使用示例
lock = RedisReentrantLock(redis_client, 'shared_resource', 10)

def nested_operation():
    """嵌套操作共享资源"""
    with lock:
        print("外层获取锁")
        # 模拟外层操作
        time.sleep(1)
        
        with lock:
            print("内层获取锁")
            # 模拟内层操作
            time.sleep(1)
        
        print("内层释放锁")
    print("外层释放锁")

2. 公平锁

公平锁按照请求顺序分配锁,避免了饥饿现象。

class RedisFairLock:
    """Redis公平锁"""
    def __init__(self, redis_client, lock_name, expire_time=10):
        self.redis_client = redis_client
        self.lock_name = lock_name
        self.expire_time = expire_time
        self.lock_key = f"lock:{lock_name}"
        self.queue_key = f"lock:{lock_name}:queue"
        self.identifier = str(uuid.uuid4())
    
    def acquire(self, timeout=5):
        """获取锁,支持超时"""
        start_time = time.time()
        
        # 1. 将自己加入等待队列
        # 使用时间戳作为分数,确保公平性
        timestamp = time.time()
        self.redis_client.zadd(self.queue_key, {self.identifier: timestamp})
        
        try:
            while time.time() - start_time < timeout:
                # 2. 尝试获取锁(只有队列头部的进程才能获取)
                # 获取队列头部元素
                top_element = self.redis_client.zrange(self.queue_key, 0, 0)
                
                if top_element and top_element[0].decode('utf-8') == self.identifier:
                    # 是队列头部,可以尝试获取锁
                    result = self.redis_client.set(self.lock_key, self.identifier, nx=True, ex=self.expire_time)
                    if result:
                        # 获取锁成功
                        return True
                
                # 短暂休眠,避免忙等
                time.sleep(0.1)
            
            # 超时
            return False
        finally:
            # 3. 如果获取锁失败,从队列中移除自己
            if not self._is_holding_lock():
                self.redis_client.zrem(self.queue_key, self.identifier)
    
    def release(self):
        """释放锁"""
        # 1. 安全释放锁
        script = """
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
        """
        result = self.redis_client.eval(script, 1, self.lock_key, self.identifier)
        
        # 2. 从队列中移除自己
        self.redis_client.zrem(self.queue_key, self.identifier)
        
        return result == 1
    
    def _is_holding_lock(self):
        """检查是否持有锁"""
        current_identifier = self.redis_client.get(self.lock_key)
        return current_identifier and current_identifier.decode('utf-8') == self.identifier
    
    def __enter__(self):
        """支持with语句"""
        self.acquire()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """支持with语句"""
        self.release()

# 使用示例
def worker(worker_id):
    """模拟工作线程"""
    lock = RedisFairLock(redis_client, 'shared_resource', 10)
    print(f"Worker {worker_id} 尝试获取锁")
    
    if lock.acquire(timeout=10):
        try:
            print(f"Worker {worker_id} 获取锁成功,开始工作")
            # 模拟工作
            time.sleep(2)
            print(f"Worker {worker_id} 工作完成")
        finally:
            lock.release()
            print(f"Worker {worker_id} 释放锁")
    else:
        print(f"Worker {worker_id} 获取锁失败")

# 启动多个工作线程
import threading
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    t.start()

3. 红锁算法(RedLock)

红锁算法是Redis官方推荐的分布式锁实现方案,它通过在多个Redis节点上获取锁来提高可靠性。

红锁算法的步骤

  1. 获取当前时间:记录开始获取锁的时间
  2. 尝试在所有节点上获取锁:在每个节点上尝试获取锁,使用相同的过期时间
  3. 计算获取锁的时间:计算从开始到成功获取锁的时间
  4. 验证获取锁是否成功
    • 成功获取锁的节点数大于等于多数派(N/2 + 1)
    • 获取锁的时间小于锁的过期时间
  5. 重新计算锁的过期时间:原始过期时间减去获取锁的时间
  6. 释放锁:在所有节点上释放锁(无论是否成功获取)

红锁实现

class RedisRedLock:
    """Redis红锁实现"""
    def __init__(self, redis_clients, lock_name, expire_time=10, retry_count=3, retry_delay=0.1):
        """
        初始化红锁
        :param redis_clients: Redis客户端列表
        :param lock_name: 锁名称
        :param expire_time: 锁过期时间(秒)
        :param retry_count: 获取锁失败时的重试次数
        :param retry_delay: 重试间隔(秒)
        """
        self.redis_clients = redis_clients
        self.lock_name = lock_name
        self.expire_time = expire_time
        self.retry_count = retry_count
        self.retry_delay = retry_delay
        self.identifier = str(uuid.uuid4())
        self.lock_keys = [f"lock:{lock_name}:{i}" for i in range(len(redis_clients))]
    
    def acquire(self):
        """获取红锁"""
        for attempt in range(self.retry_count + 1):
            acquired_locks = []
            start_time = time.time()
            
            # 尝试在所有节点上获取锁
            for i, client in enumerate(self.redis_clients):
                try:
                    result = client.set(self.lock_keys[i], self.identifier, nx=True, ex=self.expire_time)
                    if result:
                        acquired_locks.append(i)
                except Exception as e:
                    # 节点故障,跳过
                    print(f"节点 {i} 故障: {e}")
            
            # 计算获取锁的时间
            acquire_time = time.time() - start_time
            
            # 验证获取锁是否成功
            majority = len(self.redis_clients) // 2 + 1
            if len(acquired_locks) >= majority and acquire_time < self.expire_time:
                # 获取锁成功,重新计算过期时间
                self.validity_time = self.expire_time - acquire_time
                print(f"获取红锁成功,在 {len(acquired_locks)} 个节点上获取了锁")
                return True
            
            # 获取锁失败,释放已获取的锁
            for i in acquired_locks:
                try:
                    self.redis_clients[i].delete(self.lock_keys[i])
                except Exception:
                    pass
            
            # 重试
            if attempt < self.retry_count:
                print(f"获取红锁失败,{self.retry_delay}秒后重试")
                time.sleep(self.retry_delay)
            else:
                print("获取红锁失败,已达到最大重试次数")
                return False
    
    def release(self):
        """释放红锁"""
        # 释放所有节点上的锁
        released_count = 0
        for i, client in enumerate(self.redis_clients):
            try:
                # 使用Lua脚本安全释放锁
                script = """
                if redis.call('get', KEYS[1]) == ARGV[1] then
                    return redis.call('del', KEYS[1])
                else:
                    return 0
                end
                """
                result = client.eval(script, 1, self.lock_keys[i], self.identifier)
                if result == 1:
                    released_count += 1
            except Exception as e:
                # 节点故障,跳过
                print(f"释放节点 {i} 锁时故障: {e}")
        
        print(f"释放红锁完成,释放了 {released_count} 个节点的锁")
        return True
    
    def __enter__(self):
        """支持with语句"""
        self.acquire()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """支持with语句"""
        self.release()

# 使用示例
# 创建多个Redis客户端(模拟多个节点)
redis_clients = [
    redis.Redis(host='localhost', port=6379, db=0),
    redis.Redis(host='localhost', port=6380, db=0),  # 假设这是另一个Redis节点
    redis.Redis(host='localhost', port=6381, db=0)   # 假设这是第三个Redis节点
]

# 初始化红锁
red_lock = RedisRedLock(redis_clients, 'critical_resource', 10)

def critical_operation():
    """执行关键操作"""
    if red_lock.acquire():
        try:
            print("获取红锁成功,执行关键操作")
            # 模拟关键操作
            time.sleep(5)
            print("关键操作完成")
        finally:
            red_lock.release()
            print("红锁已释放")
    else:
        print("获取红锁失败,无法执行关键操作")

实际应用场景

1. 分布式任务调度

在分布式系统中,确保只有一个实例执行特定任务。

def schedule_task(task_name):
    """调度任务"""
    lock_name = f"task:{task_name}"
    
    # 尝试获取锁
    lock_id = acquire_lock(lock_name, 60)  # 锁过期时间60秒
    
    if lock_id:
        try:
            print(f"开始执行任务: {task_name}")
            # 执行任务
            execute_task(task_name)
            print(f"任务执行完成: {task_name}")
        finally:
            # 释放锁
            release_lock(lock_name, lock_id)
    else:
        print(f"任务 {task_name} 已在执行中")

def execute_task(task_name):
    """执行具体任务"""
    # 模拟任务执行
    time.sleep(5)
    print(f"执行 {task_name} 的具体逻辑")

# 示例:定时清理过期数据
schedule_task("cleanup_expired_data")

2. 库存管理

在电商系统中,确保库存不会超卖。

def deduct_inventory(product_id, quantity):
    """扣减库存"""
    lock_name = f"inventory:{product_id}"
    
    # 尝试获取锁
    lock_id = acquire_lock(lock_name, 10)
    
    if lock_id:
        try:
            # 获取当前库存
            current_stock = redis_client.get(f"stock:{product_id}")
            if not current_stock:
                print(f"商品 {product_id} 不存在")
                return False
            
            current_stock = int(current_stock)
            
            # 检查库存是否充足
            if current_stock < quantity:
                print(f"商品 {product_id} 库存不足")
                return False
            
            # 扣减库存
            new_stock = current_stock - quantity
            redis_client.set(f"stock:{product_id}", new_stock)
            print(f"商品 {product_id} 库存扣减成功,当前库存: {new_stock}")
            return True
        finally:
            # 释放锁
            release_lock(lock_name, lock_id)
    else:
        print(f"商品 {product_id} 库存正在处理中,请稍后再试")
        return False

# 初始化库存
redis_client.set("stock:product1", 100)

# 模拟并发扣减库存
import threading

def simulate_concurrent_deduction():
    """模拟并发扣减库存"""
    threads = []
    for i in range(10):
        t = threading.Thread(target=deduct_inventory, args=("product1", 10))
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    
    # 检查最终库存
    final_stock = redis_client.get("stock:product1")
    print(f"最终库存: {final_stock}")

simulate_concurrent_deduction()

3. 分布式限流

结合分布式锁实现简单的分布式限流。

def rate_limit(key, limit, window):
    """
    分布式限流
    :param key: 限流键
    :param limit: 时间窗口内的最大请求数
    :param window: 时间窗口(秒)
    :return: 是否允许请求
    """
    # 使用滑动窗口计数
    current_time = int(time.time())
    window_start = current_time - window
    
    # 构建键名
    count_key = f"rate_limit:{key}"
    lock_key = f"rate_limit_lock:{key}"
    
    # 获取锁
    lock_id = acquire_lock(lock_key, 1)  # 锁过期时间1秒
    
    if lock_id:
        try:
            # 移除窗口外的计数
            redis_client.zremrangebyscore(count_key, 0, window_start)
            
            # 获取当前计数
            current_count = redis_client.zcard(count_key)
            
            if current_count < limit:
                # 允许请求,增加计数
                redis_client.zadd(count_key, {str(uuid.uuid4()): current_time})
                # 设置过期时间,避免内存泄漏
                redis_client.expire(count_key, window)
                return True
            else:
                # 超出限制
                return False
        finally:
            # 释放锁
            release_lock(lock_key, lock_id)
    else:
        # 获取锁失败,默认允许请求(避免因锁竞争导致服务不可用)
        return True

# 使用示例
def handle_request(user_id):
    """处理用户请求"""
    # 限制每个用户每分钟最多100个请求
    if rate_limit(f"user:{user_id}", 100, 60):
        print(f"处理用户 {user_id} 的请求")
        # 处理请求逻辑
    else:
        print(f"用户 {user_id} 的请求过于频繁,请稍后再试")

# 模拟用户请求
handle_request("user123")

性能优化

1. 减少锁的粒度

将大锁拆分为多个小锁,减少锁竞争。

# 优化前:使用一个大锁保护所有商品的库存
def deduct_inventory_all(product_id, quantity):
    lock_id = acquire_lock("inventory:all", 10)
    if lock_id:
        try:
            # 处理库存扣减
            pass
        finally:
            release_lock("inventory:all", lock_id)

# 优化后:使用细粒度锁,每个商品一个锁
def deduct_inventory_per_product(product_id, quantity):
    lock_id = acquire_lock(f"inventory:{product_id}", 10)
    if lock_id:
        try:
            # 处理库存扣减
            pass
        finally:
            release_lock(f"inventory:{product_id}", lock_id)

2. 使用异步获取锁

对于非关键操作,使用异步方式获取锁,避免阻塞主线程。

import asyncio

async def acquire_lock_async(lock_name, expire_time=10):
    """异步获取锁"""
    # 生成唯一标识符
    identifier = str(uuid.uuid4())
    lock_key = f"lock:{lock_name}"
    
    # 尝试获取锁
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, redis_client.set, lock_key, identifier, 'NX', 'EX', expire_time)
    
    if result:
        return identifier
    return None

async def process_async():
    """异步处理"""
    lock_id = await acquire_lock_async('async_task', 10)
    if lock_id:
        try:
            print("获取锁成功,开始异步处理")
            # 模拟异步操作
            await asyncio.sleep(5)
            print("异步处理完成")
        finally:
            # 释放锁
            loop = asyncio.get_event_loop()
            await loop.run_in_executor(None, release_lock, 'async_task', lock_id)
            print("锁已释放")

# 运行异步函数
asyncio.run(process_async())

3. 缓存锁信息

对于频繁获取的锁,可以在本地缓存锁信息,减少Redis操作。

class LockCache:
    """锁缓存"""
    def __init__(self):
        self.locks = {}
        self.lock_expiry = {}
    
    def is_locked_locally(self, lock_name):
        """检查锁是否在本地缓存中"""
        if lock_name in self.locks:
            # 检查锁是否过期
            if time.time() < self.lock_expiry.get(lock_name, 0):
                return True
            # 锁已过期,从缓存中移除
            del self.locks[lock_name]
            del self.lock_expiry[lock_name]
        return False
    
    def add_lock(self, lock_name, identifier, expiry):
        """添加锁到缓存"""
        self.locks[lock_name] = identifier
        self.lock_expiry[lock_name] = time.time() + expiry
    
    def remove_lock(self, lock_name):
        """从缓存中移除锁"""
        if lock_name in self.locks:
            del self.locks[lock_name]
            del self.lock_expiry[lock_name]

# 创建锁缓存
lock_cache = LockCache()

# 优化获取锁函数
def acquire_lock_with_cache(lock_name, expire_time=10):
    """使用缓存获取锁"""
    # 先检查本地缓存
    if lock_cache.is_locked_locally(lock_name):
        return None
    
    # 尝试从Redis获取锁
    identifier = acquire_lock(lock_name, expire_time)
    if identifier:
        # 添加到本地缓存
        lock_cache.add_lock(lock_name, identifier, expire_time * 0.8)  # 缓存时间略短于锁过期时间
    return identifier

# 优化释放锁函数
def release_lock_with_cache(lock_name, identifier):
    """使用缓存释放锁"""
    # 从本地缓存中移除
    lock_cache.remove_lock(lock_name)
    # 从Redis释放
    return release_lock(lock_name, identifier)

安全性考虑

1. 锁过期时间的设置

  • 过短:可能导致锁在操作完成前就过期,造成并发问题
  • 过长:可能导致系统在进程崩溃后长时间无法获取锁

建议:根据操作的平均执行时间设置过期时间,通常为平均执行时间的2-3倍。

2. 网络延迟的影响

在分布式环境中,网络延迟可能导致锁的状态不一致。

解决方案

  • 使用Lua脚本确保操作的原子性
  • 实现锁的续约机制,对于长时间运行的操作

3. 节点故障的处理

Redis节点故障可能导致锁丢失。

解决方案

  • 使用Redis集群或哨兵模式提高可用性
  • 对于关键场景,使用红锁算法

4. 锁竞争的处理

高并发场景下,锁竞争可能导致性能下降。

解决方案

  • 使用细粒度锁
  • 实现锁的等待队列
  • 合理设置重试策略

最佳实践

  1. 选择合适的实现方式

    • 简单场景:使用基本实现
    • 需要可重入性:使用可重入锁实现
    • 高可靠性要求:使用红锁实现
  2. 合理设置锁的过期时间

    • 根据操作的执行时间设置
    • 考虑添加锁续约机制
  3. 使用Lua脚本确保原子性

    • 获取锁时使用SET命令(带NX选项)
    • 释放锁时使用Lua脚本
  4. 实现错误处理和重试机制

    • 处理Redis连接失败的情况
    • 实现合理的重试策略
  5. 监控和告警

    • 监控锁的获取和释放情况
    • 对长时间持有锁的情况进行告警
  6. 测试锁的可靠性

    • 模拟各种故障场景(进程崩溃、网络分区等)
    • 测试并发性能

小结

Redis是实现分布式锁的优秀选择,它提供了高性能、简单易用的特性。通过本教程的学习,你应该已经掌握了:

  • Redis分布式锁的基本原理和实现方法
  • 不同类型的分布式锁实现(简单锁、可重入锁、公平锁、红锁)
  • 分布式锁在实际场景中的应用
  • 性能优化和安全性考虑
  • 最佳实践和注意事项

在实际项目中,选择合适的分布式锁实现方式需要考虑系统的可靠性要求、性能需求以及复杂度等因素。对于关键业务场景,建议使用经过验证的开源库(如Redisson)来实现分布式锁,以确保可靠性和安全性。

« 上一篇 Redis会话存储 下一篇 » Redis缓存策略