Redis分布式锁实现
分布式锁概述
在分布式系统中,多个进程可能同时访问共享资源,为了保证数据一致性和操作的原子性,需要一种机制来协调这些进程的访问,这就是分布式锁的作用。
分布式锁的基本要求
- 互斥性:在任何时刻,只有一个进程可以持有锁
- 无死锁:即使持有锁的进程崩溃,锁也能被释放
- 容错性:只要大部分Redis节点正常工作,锁机制就能正常运行
- 公平性:(可选)按照请求顺序获取锁
- 可重入性:(可选)同一个进程可以多次获取同一把锁
传统锁与分布式锁的区别
| 特性 | 传统锁(单机) | 分布式锁 |
|---|---|---|
| 作用范围 | 单个进程内 | 跨多个进程、多个机器 |
| 实现方式 | 内存锁(如Java的synchronized) | 基于共享存储(如Redis、ZooKeeper) |
| 复杂性 | 简单 | 复杂(需要考虑网络延迟、节点故障等) |
| 可靠性 | 高 | 取决于实现方式和底层存储的可靠性 |
Redis实现分布式锁的原理
Redis实现分布式锁的核心思想是利用Redis的原子操作来确保只有一个进程能成功获取锁。
基本实现思路
- 获取锁:使用SET命令(带NX选项)尝试设置一个键,成功表示获取锁
- 释放锁:使用DEL命令删除锁键
- 避免死锁:给锁设置过期时间,确保即使进程崩溃也能自动释放锁
基本实现
1. 简单实现
使用Redis的SET命令(带NX和EX选项)实现基本的分布式锁。
import redis
import time
import uuid
# 连接Redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def acquire_lock(lock_name, expire_time=10):
"""获取分布式锁"""
# 生成唯一标识符,用于释放锁时的身份验证
identifier = str(uuid.uuid4())
# 锁的键名
lock_key = f"lock:{lock_name}"
# 使用SET命令尝试获取锁
# NX: 只有当键不存在时才设置
# EX: 设置过期时间(秒)
result = redis_client.set(lock_key, identifier, nx=True, ex=expire_time)
if result:
return identifier
return None
def release_lock(lock_name, identifier):
"""释放分布式锁"""
lock_key = f"lock:{lock_name}"
# 获取锁的当前值
current_identifier = redis_client.get(lock_key)
# 验证锁的持有者是否是当前进程
if current_identifier and current_identifier.decode('utf-8') == identifier:
# 释放锁
redis_client.delete(lock_key)
return True
return False
# 使用示例
def process_shared_resource():
"""处理共享资源"""
# 尝试获取锁,过期时间10秒
lock_id = acquire_lock('shared_resource', 10)
if lock_id:
try:
print("获取锁成功,开始处理共享资源...")
# 模拟处理共享资源的操作
time.sleep(5)
print("共享资源处理完成")
finally:
# 释放锁
release_lock('shared_resource', lock_id)
print("锁已释放")
else:
print("获取锁失败,资源忙")2. 使用Lua脚本释放锁
上述实现存在一个问题:在获取锁值和删除锁之间可能存在竞态条件。使用Lua脚本可以确保释放锁操作的原子性。
def release_lock_safely(lock_name, identifier):
"""安全释放分布式锁"""
lock_key = f"lock:{lock_name}"
# Lua脚本,确保获取锁值和删除锁的原子性
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
# 执行Lua脚本
result = redis_client.eval(script, 1, lock_key, identifier)
return result == 1
# 更新释放锁函数
def release_lock(lock_name, identifier):
return release_lock_safely(lock_name, identifier)高级实现
1. 可重入锁
可重入锁允许同一个进程多次获取同一把锁,避免了死锁的风险。
class RedisReentrantLock:
"""Redis可重入锁"""
def __init__(self, redis_client, lock_name, expire_time=10):
self.redis_client = redis_client
self.lock_name = lock_name
self.expire_time = expire_time
self.lock_key = f"lock:{lock_name}"
# 线程本地存储,记录当前线程的锁获取次数
import threading
self.local = threading.local()
def acquire(self):
"""获取锁"""
# 生成唯一标识符
identifier = str(uuid.uuid4())
# 检查本地存储中是否已有锁信息
if hasattr(self.local, 'lock_count') and hasattr(self.local, 'identifier'):
# 已获取过锁,增加计数
self.local.lock_count += 1
# 刷新锁的过期时间
self.redis_client.expire(self.lock_key, self.expire_time)
return True
# 尝试获取锁
result = self.redis_client.set(self.lock_key, identifier, nx=True, ex=self.expire_time)
if result:
# 获取锁成功,记录锁信息到本地存储
self.local.lock_count = 1
self.local.identifier = identifier
return True
return False
def release(self):
"""释放锁"""
# 检查本地存储中是否有锁信息
if not (hasattr(self.local, 'lock_count') and hasattr(self.local, 'identifier')):
return False
# 减少计数
self.local.lock_count -= 1
if self.local.lock_count <= 0:
# 计数为0,真正释放锁
identifier = self.local.identifier
# 清理本地存储
delattr(self.local, 'lock_count')
delattr(self.local, 'identifier')
# 使用Lua脚本安全释放锁
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
result = self.redis_client.eval(script, 1, self.lock_key, identifier)
return result == 1
else:
# 计数不为0,只刷新过期时间
self.redis_client.expire(self.lock_key, self.expire_time)
return True
def __enter__(self):
"""支持with语句"""
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""支持with语句"""
self.release()
# 使用示例
lock = RedisReentrantLock(redis_client, 'shared_resource', 10)
def nested_operation():
"""嵌套操作共享资源"""
with lock:
print("外层获取锁")
# 模拟外层操作
time.sleep(1)
with lock:
print("内层获取锁")
# 模拟内层操作
time.sleep(1)
print("内层释放锁")
print("外层释放锁")2. 公平锁
公平锁按照请求顺序分配锁,避免了饥饿现象。
class RedisFairLock:
"""Redis公平锁"""
def __init__(self, redis_client, lock_name, expire_time=10):
self.redis_client = redis_client
self.lock_name = lock_name
self.expire_time = expire_time
self.lock_key = f"lock:{lock_name}"
self.queue_key = f"lock:{lock_name}:queue"
self.identifier = str(uuid.uuid4())
def acquire(self, timeout=5):
"""获取锁,支持超时"""
start_time = time.time()
# 1. 将自己加入等待队列
# 使用时间戳作为分数,确保公平性
timestamp = time.time()
self.redis_client.zadd(self.queue_key, {self.identifier: timestamp})
try:
while time.time() - start_time < timeout:
# 2. 尝试获取锁(只有队列头部的进程才能获取)
# 获取队列头部元素
top_element = self.redis_client.zrange(self.queue_key, 0, 0)
if top_element and top_element[0].decode('utf-8') == self.identifier:
# 是队列头部,可以尝试获取锁
result = self.redis_client.set(self.lock_key, self.identifier, nx=True, ex=self.expire_time)
if result:
# 获取锁成功
return True
# 短暂休眠,避免忙等
time.sleep(0.1)
# 超时
return False
finally:
# 3. 如果获取锁失败,从队列中移除自己
if not self._is_holding_lock():
self.redis_client.zrem(self.queue_key, self.identifier)
def release(self):
"""释放锁"""
# 1. 安全释放锁
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
result = self.redis_client.eval(script, 1, self.lock_key, self.identifier)
# 2. 从队列中移除自己
self.redis_client.zrem(self.queue_key, self.identifier)
return result == 1
def _is_holding_lock(self):
"""检查是否持有锁"""
current_identifier = self.redis_client.get(self.lock_key)
return current_identifier and current_identifier.decode('utf-8') == self.identifier
def __enter__(self):
"""支持with语句"""
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""支持with语句"""
self.release()
# 使用示例
def worker(worker_id):
"""模拟工作线程"""
lock = RedisFairLock(redis_client, 'shared_resource', 10)
print(f"Worker {worker_id} 尝试获取锁")
if lock.acquire(timeout=10):
try:
print(f"Worker {worker_id} 获取锁成功,开始工作")
# 模拟工作
time.sleep(2)
print(f"Worker {worker_id} 工作完成")
finally:
lock.release()
print(f"Worker {worker_id} 释放锁")
else:
print(f"Worker {worker_id} 获取锁失败")
# 启动多个工作线程
import threading
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
t.start()3. 红锁算法(RedLock)
红锁算法是Redis官方推荐的分布式锁实现方案,它通过在多个Redis节点上获取锁来提高可靠性。
红锁算法的步骤
- 获取当前时间:记录开始获取锁的时间
- 尝试在所有节点上获取锁:在每个节点上尝试获取锁,使用相同的过期时间
- 计算获取锁的时间:计算从开始到成功获取锁的时间
- 验证获取锁是否成功:
- 成功获取锁的节点数大于等于多数派(N/2 + 1)
- 获取锁的时间小于锁的过期时间
- 重新计算锁的过期时间:原始过期时间减去获取锁的时间
- 释放锁:在所有节点上释放锁(无论是否成功获取)
红锁实现
class RedisRedLock:
"""Redis红锁实现"""
def __init__(self, redis_clients, lock_name, expire_time=10, retry_count=3, retry_delay=0.1):
"""
初始化红锁
:param redis_clients: Redis客户端列表
:param lock_name: 锁名称
:param expire_time: 锁过期时间(秒)
:param retry_count: 获取锁失败时的重试次数
:param retry_delay: 重试间隔(秒)
"""
self.redis_clients = redis_clients
self.lock_name = lock_name
self.expire_time = expire_time
self.retry_count = retry_count
self.retry_delay = retry_delay
self.identifier = str(uuid.uuid4())
self.lock_keys = [f"lock:{lock_name}:{i}" for i in range(len(redis_clients))]
def acquire(self):
"""获取红锁"""
for attempt in range(self.retry_count + 1):
acquired_locks = []
start_time = time.time()
# 尝试在所有节点上获取锁
for i, client in enumerate(self.redis_clients):
try:
result = client.set(self.lock_keys[i], self.identifier, nx=True, ex=self.expire_time)
if result:
acquired_locks.append(i)
except Exception as e:
# 节点故障,跳过
print(f"节点 {i} 故障: {e}")
# 计算获取锁的时间
acquire_time = time.time() - start_time
# 验证获取锁是否成功
majority = len(self.redis_clients) // 2 + 1
if len(acquired_locks) >= majority and acquire_time < self.expire_time:
# 获取锁成功,重新计算过期时间
self.validity_time = self.expire_time - acquire_time
print(f"获取红锁成功,在 {len(acquired_locks)} 个节点上获取了锁")
return True
# 获取锁失败,释放已获取的锁
for i in acquired_locks:
try:
self.redis_clients[i].delete(self.lock_keys[i])
except Exception:
pass
# 重试
if attempt < self.retry_count:
print(f"获取红锁失败,{self.retry_delay}秒后重试")
time.sleep(self.retry_delay)
else:
print("获取红锁失败,已达到最大重试次数")
return False
def release(self):
"""释放红锁"""
# 释放所有节点上的锁
released_count = 0
for i, client in enumerate(self.redis_clients):
try:
# 使用Lua脚本安全释放锁
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else:
return 0
end
"""
result = client.eval(script, 1, self.lock_keys[i], self.identifier)
if result == 1:
released_count += 1
except Exception as e:
# 节点故障,跳过
print(f"释放节点 {i} 锁时故障: {e}")
print(f"释放红锁完成,释放了 {released_count} 个节点的锁")
return True
def __enter__(self):
"""支持with语句"""
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""支持with语句"""
self.release()
# 使用示例
# 创建多个Redis客户端(模拟多个节点)
redis_clients = [
redis.Redis(host='localhost', port=6379, db=0),
redis.Redis(host='localhost', port=6380, db=0), # 假设这是另一个Redis节点
redis.Redis(host='localhost', port=6381, db=0) # 假设这是第三个Redis节点
]
# 初始化红锁
red_lock = RedisRedLock(redis_clients, 'critical_resource', 10)
def critical_operation():
"""执行关键操作"""
if red_lock.acquire():
try:
print("获取红锁成功,执行关键操作")
# 模拟关键操作
time.sleep(5)
print("关键操作完成")
finally:
red_lock.release()
print("红锁已释放")
else:
print("获取红锁失败,无法执行关键操作")实际应用场景
1. 分布式任务调度
在分布式系统中,确保只有一个实例执行特定任务。
def schedule_task(task_name):
"""调度任务"""
lock_name = f"task:{task_name}"
# 尝试获取锁
lock_id = acquire_lock(lock_name, 60) # 锁过期时间60秒
if lock_id:
try:
print(f"开始执行任务: {task_name}")
# 执行任务
execute_task(task_name)
print(f"任务执行完成: {task_name}")
finally:
# 释放锁
release_lock(lock_name, lock_id)
else:
print(f"任务 {task_name} 已在执行中")
def execute_task(task_name):
"""执行具体任务"""
# 模拟任务执行
time.sleep(5)
print(f"执行 {task_name} 的具体逻辑")
# 示例:定时清理过期数据
schedule_task("cleanup_expired_data")2. 库存管理
在电商系统中,确保库存不会超卖。
def deduct_inventory(product_id, quantity):
"""扣减库存"""
lock_name = f"inventory:{product_id}"
# 尝试获取锁
lock_id = acquire_lock(lock_name, 10)
if lock_id:
try:
# 获取当前库存
current_stock = redis_client.get(f"stock:{product_id}")
if not current_stock:
print(f"商品 {product_id} 不存在")
return False
current_stock = int(current_stock)
# 检查库存是否充足
if current_stock < quantity:
print(f"商品 {product_id} 库存不足")
return False
# 扣减库存
new_stock = current_stock - quantity
redis_client.set(f"stock:{product_id}", new_stock)
print(f"商品 {product_id} 库存扣减成功,当前库存: {new_stock}")
return True
finally:
# 释放锁
release_lock(lock_name, lock_id)
else:
print(f"商品 {product_id} 库存正在处理中,请稍后再试")
return False
# 初始化库存
redis_client.set("stock:product1", 100)
# 模拟并发扣减库存
import threading
def simulate_concurrent_deduction():
"""模拟并发扣减库存"""
threads = []
for i in range(10):
t = threading.Thread(target=deduct_inventory, args=("product1", 10))
threads.append(t)
t.start()
for t in threads:
t.join()
# 检查最终库存
final_stock = redis_client.get("stock:product1")
print(f"最终库存: {final_stock}")
simulate_concurrent_deduction()3. 分布式限流
结合分布式锁实现简单的分布式限流。
def rate_limit(key, limit, window):
"""
分布式限流
:param key: 限流键
:param limit: 时间窗口内的最大请求数
:param window: 时间窗口(秒)
:return: 是否允许请求
"""
# 使用滑动窗口计数
current_time = int(time.time())
window_start = current_time - window
# 构建键名
count_key = f"rate_limit:{key}"
lock_key = f"rate_limit_lock:{key}"
# 获取锁
lock_id = acquire_lock(lock_key, 1) # 锁过期时间1秒
if lock_id:
try:
# 移除窗口外的计数
redis_client.zremrangebyscore(count_key, 0, window_start)
# 获取当前计数
current_count = redis_client.zcard(count_key)
if current_count < limit:
# 允许请求,增加计数
redis_client.zadd(count_key, {str(uuid.uuid4()): current_time})
# 设置过期时间,避免内存泄漏
redis_client.expire(count_key, window)
return True
else:
# 超出限制
return False
finally:
# 释放锁
release_lock(lock_key, lock_id)
else:
# 获取锁失败,默认允许请求(避免因锁竞争导致服务不可用)
return True
# 使用示例
def handle_request(user_id):
"""处理用户请求"""
# 限制每个用户每分钟最多100个请求
if rate_limit(f"user:{user_id}", 100, 60):
print(f"处理用户 {user_id} 的请求")
# 处理请求逻辑
else:
print(f"用户 {user_id} 的请求过于频繁,请稍后再试")
# 模拟用户请求
handle_request("user123")性能优化
1. 减少锁的粒度
将大锁拆分为多个小锁,减少锁竞争。
# 优化前:使用一个大锁保护所有商品的库存
def deduct_inventory_all(product_id, quantity):
lock_id = acquire_lock("inventory:all", 10)
if lock_id:
try:
# 处理库存扣减
pass
finally:
release_lock("inventory:all", lock_id)
# 优化后:使用细粒度锁,每个商品一个锁
def deduct_inventory_per_product(product_id, quantity):
lock_id = acquire_lock(f"inventory:{product_id}", 10)
if lock_id:
try:
# 处理库存扣减
pass
finally:
release_lock(f"inventory:{product_id}", lock_id)2. 使用异步获取锁
对于非关键操作,使用异步方式获取锁,避免阻塞主线程。
import asyncio
async def acquire_lock_async(lock_name, expire_time=10):
"""异步获取锁"""
# 生成唯一标识符
identifier = str(uuid.uuid4())
lock_key = f"lock:{lock_name}"
# 尝试获取锁
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, redis_client.set, lock_key, identifier, 'NX', 'EX', expire_time)
if result:
return identifier
return None
async def process_async():
"""异步处理"""
lock_id = await acquire_lock_async('async_task', 10)
if lock_id:
try:
print("获取锁成功,开始异步处理")
# 模拟异步操作
await asyncio.sleep(5)
print("异步处理完成")
finally:
# 释放锁
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, release_lock, 'async_task', lock_id)
print("锁已释放")
# 运行异步函数
asyncio.run(process_async())3. 缓存锁信息
对于频繁获取的锁,可以在本地缓存锁信息,减少Redis操作。
class LockCache:
"""锁缓存"""
def __init__(self):
self.locks = {}
self.lock_expiry = {}
def is_locked_locally(self, lock_name):
"""检查锁是否在本地缓存中"""
if lock_name in self.locks:
# 检查锁是否过期
if time.time() < self.lock_expiry.get(lock_name, 0):
return True
# 锁已过期,从缓存中移除
del self.locks[lock_name]
del self.lock_expiry[lock_name]
return False
def add_lock(self, lock_name, identifier, expiry):
"""添加锁到缓存"""
self.locks[lock_name] = identifier
self.lock_expiry[lock_name] = time.time() + expiry
def remove_lock(self, lock_name):
"""从缓存中移除锁"""
if lock_name in self.locks:
del self.locks[lock_name]
del self.lock_expiry[lock_name]
# 创建锁缓存
lock_cache = LockCache()
# 优化获取锁函数
def acquire_lock_with_cache(lock_name, expire_time=10):
"""使用缓存获取锁"""
# 先检查本地缓存
if lock_cache.is_locked_locally(lock_name):
return None
# 尝试从Redis获取锁
identifier = acquire_lock(lock_name, expire_time)
if identifier:
# 添加到本地缓存
lock_cache.add_lock(lock_name, identifier, expire_time * 0.8) # 缓存时间略短于锁过期时间
return identifier
# 优化释放锁函数
def release_lock_with_cache(lock_name, identifier):
"""使用缓存释放锁"""
# 从本地缓存中移除
lock_cache.remove_lock(lock_name)
# 从Redis释放
return release_lock(lock_name, identifier)安全性考虑
1. 锁过期时间的设置
- 过短:可能导致锁在操作完成前就过期,造成并发问题
- 过长:可能导致系统在进程崩溃后长时间无法获取锁
建议:根据操作的平均执行时间设置过期时间,通常为平均执行时间的2-3倍。
2. 网络延迟的影响
在分布式环境中,网络延迟可能导致锁的状态不一致。
解决方案:
- 使用Lua脚本确保操作的原子性
- 实现锁的续约机制,对于长时间运行的操作
3. 节点故障的处理
Redis节点故障可能导致锁丢失。
解决方案:
- 使用Redis集群或哨兵模式提高可用性
- 对于关键场景,使用红锁算法
4. 锁竞争的处理
高并发场景下,锁竞争可能导致性能下降。
解决方案:
- 使用细粒度锁
- 实现锁的等待队列
- 合理设置重试策略
最佳实践
选择合适的实现方式:
- 简单场景:使用基本实现
- 需要可重入性:使用可重入锁实现
- 高可靠性要求:使用红锁实现
合理设置锁的过期时间:
- 根据操作的执行时间设置
- 考虑添加锁续约机制
使用Lua脚本确保原子性:
- 获取锁时使用SET命令(带NX选项)
- 释放锁时使用Lua脚本
实现错误处理和重试机制:
- 处理Redis连接失败的情况
- 实现合理的重试策略
监控和告警:
- 监控锁的获取和释放情况
- 对长时间持有锁的情况进行告警
测试锁的可靠性:
- 模拟各种故障场景(进程崩溃、网络分区等)
- 测试并发性能
小结
Redis是实现分布式锁的优秀选择,它提供了高性能、简单易用的特性。通过本教程的学习,你应该已经掌握了:
- Redis分布式锁的基本原理和实现方法
- 不同类型的分布式锁实现(简单锁、可重入锁、公平锁、红锁)
- 分布式锁在实际场景中的应用
- 性能优化和安全性考虑
- 最佳实践和注意事项
在实际项目中,选择合适的分布式锁实现方式需要考虑系统的可靠性要求、性能需求以及复杂度等因素。对于关键业务场景,建议使用经过验证的开源库(如Redisson)来实现分布式锁,以确保可靠性和安全性。