Redis发布订阅模式
1. 发布订阅模式基础
1.1 什么是发布订阅模式
发布订阅(Publish/Subscribe,简称Pub/Sub)是一种消息通信模式,其中消息发送者(发布者)不直接将消息发送给特定的接收者(订阅者),而是将消息分类为不同的频道(Channel),订阅者可以订阅感兴趣的频道,接收该频道的所有消息。
核心概念:
- 发布者(Publisher):消息的发送者,将消息发送到指定频道
- 订阅者(Subscriber):消息的接收者,订阅一个或多个频道
- 频道(Channel):消息的分类标识,发布者向频道发送消息,订阅者从频道接收消息
- 模式(Pattern):支持通配符的频道匹配模式,订阅者可以通过模式订阅多个相关频道
1.2 发布订阅模式的优缺点
优点:
- 松耦合:发布者和订阅者之间没有直接依赖关系
- 广播机制:一条消息可以同时发送给多个订阅者
- 实时性:消息实时传递,适合实时通知场景
- 灵活性:支持频道和模式订阅,满足不同的订阅需求
- 简单易用:Redis提供了简洁的命令接口
缺点:
- 消息不持久:默认情况下,Redis Pub/Sub不持久化消息,订阅者离线期间的消息会丢失
- 没有消息确认机制:发布者无法知道消息是否被订阅者接收和处理
- 不支持消息重试:订阅者处理失败后,无法重新获取消息
- 有限的消息堆积能力:消息在Redis内存中传递,不适合大量消息堆积的场景
- 单节点故障:如果Redis节点故障,所有Pub/Sub连接都会断开
2. Redis Pub/Sub核心命令
2.1 基本命令
发布消息:
PUBLISH channel message:向指定频道发布消息channel:频道名称message:消息内容- 返回值:接收到消息的订阅者数量
订阅频道:
SUBSCRIBE channel [channel ...]:订阅一个或多个频道channel:频道名称- 一旦订阅,客户端会进入订阅模式,只能接收消息和执行少数几个命令(如UNSUBSCRIBE、PSUBSCRIBE等)
取消订阅:
UNSUBSCRIBE [channel [channel ...]]:取消订阅一个或多个频道- 如果不指定频道,则取消订阅所有频道
模式订阅:
PSUBSCRIBE pattern [pattern ...]:订阅匹配指定模式的频道pattern:频道模式,支持通配符*(匹配任意字符)和?(匹配单个字符)
取消模式订阅:
PUNSUBSCRIBE [pattern [pattern ...]]:取消订阅匹配指定模式的频道- 如果不指定模式,则取消订阅所有模式
2.2 查看订阅信息
PUBSUB CHANNELS [pattern]:列出当前活跃的频道(有订阅者的频道)PUBSUB NUMSUB [channel [channel ...]]:查看指定频道的订阅者数量PUBSUB NUMPAT:查看当前模式订阅的数量
3. 基本使用示例
3.1 命令行示例
步骤1:启动第一个Redis客户端作为订阅者
# 连接Redis
redis-cli
# 订阅频道
127.0.0.1:6379> SUBSCRIBE news:sports news:tech
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news:sports"
3) (integer) 1
1) "subscribe"
2) "news:tech"
3) (integer) 2步骤2:启动第二个Redis客户端作为发布者
# 连接Redis
redis-cli
# 向sports频道发布消息
127.0.0.1:6379> PUBLISH news:sports "Lakers won the game!"
(integer) 1
# 向tech频道发布消息
127.0.0.1:6379> PUBLISH news:tech "New iPhone released!"
(integer) 1
# 向未订阅的频道发布消息
127.0.0.1:6379> PUBLISH news:entertainment "New movie coming soon!"
(integer) 0步骤3:查看订阅者接收到的消息
# 在订阅者客户端可以看到以下输出
1) "message"
2) "news:sports"
3) "Lakers won the game!"
1) "message"
2) "news:tech"
3) "New iPhone released!"3.2 模式订阅示例
步骤1:启动Redis客户端作为模式订阅者
# 连接Redis
redis-cli
# 订阅所有news开头的频道
127.0.0.1:6379> PSUBSCRIBE news:*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "news:*"
3) (integer) 1步骤2:启动另一个Redis客户端作为发布者
# 连接Redis
redis-cli
# 向不同的news频道发布消息
127.0.0.1:6379> PUBLISH news:sports "Basketball game tomorrow"
(integer) 1
127.0.0.1:6379> PUBLISH news:tech "AI breakthrough"
(integer) 1
127.0.0.1:6379> PUBLISH news:health "Exercise tips"
(integer) 1步骤3:查看模式订阅者接收到的消息
# 在模式订阅者客户端可以看到以下输出
1) "pmessage"
2) "news:*"
3) "news:sports"
4) "Basketball game tomorrow"
1) "pmessage"
2) "news:*"
3) "news:tech"
4) "AI breakthrough"
1) "pmessage"
2) "news:*"
3) "news:health"
4) "Exercise tips"4. 编程语言实现
4.1 Python实现
安装依赖:
pip install redis发布者示例:
import redis
import time
# 连接Redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def publisher():
"""消息发布者"""
channels = ['chat:general', 'chat:tech', 'chat:sports']
messages = [
'Hello everyone!',
'Redis is awesome!',
'Did you watch the game last night?',
'Python 3.10 has new features.',
'The team won the championship!'
]
for i, message in enumerate(messages):
# 循环使用不同的频道
channel = channels[i % len(channels)]
# 发布消息
result = redis_client.publish(channel, message)
print(f"发布到频道 {channel}: {message},{result} 个订阅者接收到")
time.sleep(1) # 每秒发布一条消息
if __name__ == '__main__':
publisher()订阅者示例:
import redis
import threading
def subscriber():
"""消息订阅者"""
# 创建新的Redis连接(订阅者需要单独的连接)
redis_client = redis.Redis(host='localhost', port=6379, db=0)
pubsub = redis_client.pubsub()
# 订阅频道
pubsub.subscribe('chat:general', 'chat:tech')
# 模式订阅
pubsub.psubscribe('chat:*')
print("开始接收消息...")
# 监听消息
for message in pubsub.listen():
# 过滤掉订阅确认消息
if message['type'] == 'message':
print(f"接收到频道 {message['channel'].decode()} 的消息: {message['data'].decode()}")
elif message['type'] == 'pmessage':
print(f"通过模式 {message['pattern'].decode()} 接收到频道 {message['channel'].decode()} 的消息: {message['data'].decode()}")
elif message['type'] in ['subscribe', 'psubscribe']:
print(f"成功订阅 {message['channel'].decode()}")
elif message['type'] in ['unsubscribe', 'punsubscribe']:
print(f"取消订阅 {message['channel'].decode()}")
def unsubscribe_after_delay(pubsub, delay=10):
"""延迟取消订阅"""
import time
time.sleep(delay)
print("取消订阅 chat:tech")
pubsub.unsubscribe('chat:tech')
time.sleep(5)
print("取消订阅所有频道")
pubsub.unsubscribe()
pubsub.punsubscribe()
if __name__ == '__main__':
# 启动订阅者线程
sub_thread = threading.Thread(target=subscriber)
sub_thread.daemon = True
sub_thread.start()
# 保持主线程运行
try:
while True:
pass
except KeyboardInterrupt:
print("退出程序")4.2 Node.js实现
安装依赖:
npm install redis发布者示例:
const redis = require('redis');
// 创建Redis客户端
const publisher = redis.createClient({
host: 'localhost',
port: 6379
});
// 连接Redis
publisher.connect().then(() => {
console.log('发布者连接成功');
// 发布消息
const channels = ['notifications:users', 'notifications:orders', 'notifications:system'];
const messages = [
JSON.stringify({ type: 'welcome', user_id: 123, message: 'Welcome to our platform!' }),
JSON.stringify({ type: 'order_status', order_id: 456, status: 'shipped' }),
JSON.stringify({ type: 'maintenance', message: 'System maintenance scheduled for tonight' }),
JSON.stringify({ type: 'promotion', user_id: 123, discount: '20%' }),
JSON.stringify({ type: 'order_status', order_id: 789, status: 'delivered' })
];
// 定时发布消息
let index = 0;
const interval = setInterval(() => {
if (index < messages.length) {
const channel = channels[index % channels.length];
const message = messages[index];
publisher.publish(channel, message).then(count => {
console.log(`发布到频道 ${channel}: ${message},${count} 个订阅者接收到`);
}).catch(err => {
console.error('发布消息失败:', err);
});
index++;
} else {
clearInterval(interval);
publisher.disconnect();
console.log('发布完成,断开连接');
}
}, 1000);
}).catch(err => {
console.error('发布者连接失败:', err);
});订阅者示例:
const redis = require('redis');
// 创建Redis客户端
const subscriber = redis.createClient({
host: 'localhost',
port: 6379
});
// 连接Redis
subscriber.connect().then(() => {
console.log('订阅者连接成功');
// 订阅频道
subscriber.subscribe('notifications:users', (message) => {
console.log('接收到用户通知:', message);
});
subscriber.subscribe('notifications:orders', (message) => {
console.log('接收到订单通知:', message);
});
// 模式订阅
subscriber.pSubscribe('notifications:*', (message, channel) => {
console.log(`通过模式订阅接收到 ${channel} 的消息:`, message);
});
console.log('开始接收消息...');
// 10秒后取消订阅
setTimeout(() => {
console.log('取消订阅 notifications:orders');
subscriber.unsubscribe('notifications:orders');
// 15秒后取消所有订阅
setTimeout(() => {
console.log('取消所有订阅');
subscriber.unsubscribe();
subscriber.pUnsubscribe();
// 断开连接
setTimeout(() => {
subscriber.disconnect();
console.log('断开连接');
}, 2000);
}, 5000);
}, 10000);
}).catch(err => {
console.error('订阅者连接失败:', err);
});5. 发布订阅模式的高级特性
5.1 频道命名规范
推荐的频道命名规范:
- 使用冒号
:作为分隔符,如service:component:action - 从一般到具体,如
chat:room:123 - 使用小写字母和下划线
- 保持频道名称简洁明了
示例:
user:login- 用户登录事件order:created- 订单创建事件payment:completed- 支付完成事件system:error- 系统错误事件notification:user:123- 特定用户的通知
5.2 消息格式
推荐的消息格式:
- 使用JSON格式,便于结构化数据传输
- 包含必要的元数据,如消息类型、时间戳等
- 保持消息大小合理,避免过大的消息
示例消息格式:
{
"type": "order_status_change",
"timestamp": 1620000000000,
"data": {
"order_id": "123456",
"status": "shipped",
"tracking_number": "TN1234567890",
"updated_at": "2023-05-03T10:30:00Z"
},
"version": "1.0"
}5.3 消息持久化方案
由于Redis Pub/Sub默认不持久化消息,可以通过以下方式实现消息持久化:
方案1:结合List实现持久化
- 发布者在发布消息到频道的同时,将消息存储到List中
- 订阅者在启动时,先从List中获取历史消息,再订阅实时消息
实现示例:
def publish_with_persistence(channel, message):
"""发布消息并持久化"""
# 发布到频道
redis_client.publish(channel, message)
# 存储到List(设置过期时间,避免内存占用过大)
history_key = f"history:{channel}"
redis_client.lpush(history_key, message)
redis_client.ltrim(history_key, 0, 999) # 只保留最近1000条消息
redis_client.expire(history_key, 86400) # 24小时过期
def get_history_messages(channel, limit=100):
"""获取历史消息"""
history_key = f"history:{channel}"
return redis_client.lrange(history_key, 0, limit-1)方案2:使用Redis Stream
Redis 5.0+ 引入的Stream数据结构,提供了更完善的消息队列功能,支持:
- 消息持久化
- 消息ID和顺序
- 消费者组
- 消息确认
- 消息回溯
实现示例:
def publish_to_stream(stream, message):
"""发布消息到Stream"""
# 添加消息到Stream
return redis_client.xadd(stream, message)
def consume_from_stream(stream, consumer_group, consumer_name, start_id='>'):
"""从Stream消费消息"""
# 创建消费者组(如果不存在)
try:
redis_client.xgroup_create(stream, consumer_group, id='0', mkstream=True)
except Exception:
# 消费者组已存在,忽略错误
pass
# 消费消息
while True:
messages = redis_client.xreadgroup(
consumer_group, consumer_name, {stream: start_id}, count=1, block=0
)
if messages:
for message in messages:
stream_name, message_list = message
for msg_id, msg_data in message_list:
print(f"接收到消息 {msg_id}: {msg_data}")
# 处理消息
# 确认消息处理完成
redis_client.xack(stream, consumer_group, msg_id)6. 实际应用场景
6.1 实时聊天系统
需求:实现用户之间的实时聊天功能,支持不同的聊天频道。
实现:
# 发布消息(发送聊天消息)
def send_chat_message(channel, user_id, username, content):
message = {
'user_id': user_id,
'username': username,
'content': content,
'timestamp': time.time()
}
redis_client.publish(f"chat:{channel}", json.dumps(message))
# 订阅消息(接收聊天消息)
def subscribe_to_chat(channel):
pubsub = redis_client.pubsub()
pubsub.subscribe(f"chat:{channel}")
for message in pubsub.listen():
if message['type'] == 'message':
chat_message = json.loads(message['data'])
print(f"[{chat_message['username']}] {chat_message['content']}")6.2 系统通知
需求:实现系统级别的通知功能,如用户注册、订单状态变更、系统维护等。
实现:
def send_notification(notification_type, user_id=None, data=None):
"""发送通知"""
message = {
'type': notification_type,
'user_id': user_id,
'data': data,
'timestamp': time.time()
}
# 发送到用户特定频道(如果指定了用户)
if user_id:
redis_client.publish(f"notification:user:{user_id}", json.dumps(message))
# 发送到类型频道
redis_client.publish(f"notification:type:{notification_type}", json.dumps(message))
# 发送到所有通知频道
redis_client.publish("notification:all", json.dumps(message))
# 示例:发送订单状态变更通知
send_notification(
'order_status',
user_id=123,
data={'order_id': 456, 'status': 'shipped', 'tracking_number': 'TN123456'}
)6.3 实时监控和告警
需求:实现系统监控和告警功能,当系统指标超过阈值时,及时通知相关人员。
实现:
def monitor_system_metrics():
"""监控系统指标"""
while True:
# 模拟获取系统指标
cpu_usage = random.uniform(0, 100)
memory_usage = random.uniform(0, 100)
disk_usage = random.uniform(0, 100)
# 发布指标数据
metrics = {
'cpu': cpu_usage,
'memory': memory_usage,
'disk': disk_usage,
'timestamp': time.time()
}
redis_client.publish('metrics:system', json.dumps(metrics))
# 检查是否需要告警
if cpu_usage > 90:
alert = {
'level': 'critical',
'message': f'CPU usage is too high: {cpu_usage:.2f}%',
'timestamp': time.time()
}
redis_client.publish('alert:system', json.dumps(alert))
time.sleep(5) # 每5秒检查一次
# 告警订阅者
def alert_subscriber():
pubsub = redis_client.pubsub()
pubsub.subscribe('alert:system')
for message in pubsub.listen():
if message['type'] == 'message':
alert = json.loads(message['data'])
print(f"【{alert['level'].upper()}】{alert['message']}")
# 可以集成邮件、短信等通知方式6.4 分布式系统事件总线
需求:在分布式系统中,不同服务之间通过事件总线进行通信。
实现:
class EventBus:
"""分布式事件总线"""
def __init__(self, redis_client):
self.redis_client = redis_client
def publish(self, event_type, data):
"""发布事件"""
event = {
'event_type': event_type,
'data': data,
'timestamp': time.time(),
'event_id': str(uuid.uuid4())
}
channel = f"event:{event_type}"
result = self.redis_client.publish(channel, json.dumps(event))
return {
'event_id': event['event_id'],
'channel': channel,
'subscribers_count': result
}
def subscribe(self, event_type, callback):
"""订阅事件"""
def _subscribe_handler():
pubsub = self.redis_client.pubsub()
pubsub.subscribe(f"event:{event_type}")
for message in pubsub.listen():
if message['type'] == 'message':
event = json.loads(message['data'])
callback(event)
# 启动订阅线程
thread = threading.Thread(target=_subscribe_handler)
thread.daemon = True
thread.start()
return thread
# 使用示例
event_bus = EventBus(redis_client)
# 订阅用户注册事件
def handle_user_registered(event):
print(f"用户注册事件: {event['data']}")
# 处理用户注册后的逻辑,如发送欢迎邮件、初始化用户数据等
event_bus.subscribe('user.registered', handle_user_registered)
# 发布用户注册事件
event_bus.publish('user.registered', {
'user_id': 123,
'username': 'john_doe',
'email': 'john@example.com'
})7. 最佳实践
7.1 性能优化
- 合理使用频道:避免创建过多频道,建议根据业务逻辑合理分类
- 消息大小控制:消息内容不宜过大,建议控制在1KB以内
- 使用管道:发布多条消息时,使用管道(Pipeline)减少网络往返时间
- 连接管理:订阅者应使用单独的Redis连接,避免与其他操作混用
- 模式订阅优化:避免使用过于宽泛的模式(如
*),减少不必要的消息传递
7.2 可靠性保障
- 考虑使用Redis Stream:对于需要消息持久化和确认的场景,推荐使用Stream
- 实现消息重试机制:关键消息可以结合数据库或其他持久化方式,实现重试逻辑
- 监控订阅状态:定期检查订阅连接状态,确保订阅正常
- 使用Redis Sentinel或Cluster:提高Redis服务的可用性,减少因Redis故障导致的Pub/Sub中断
7.3 安全性考虑
- 限制Redis的网络访问:只允许必要的IP访问Redis服务
- 设置密码认证:防止未授权访问
- 使用ACL:限制Pub/Sub相关命令的权限
- 消息内容加密:对于敏感消息,考虑在发送前加密
- 避免在消息中存储敏感信息:如密码、令牌等
7.4 适用场景判断
适合使用Redis Pub/Sub的场景:
- 实时通知和告警
- 聊天和消息系统
- 系统事件广播
- 监控数据实时传输
- 微服务之间的轻量级通信
不适合的场景:
- 需要消息持久化的场景
- 对消息可靠性要求极高的场景
- 消息量极大的场景
- 需要复杂的消息路由和处理逻辑的场景
8. 总结
Redis Pub/Sub提供了一种简单高效的发布订阅机制,适合实时消息传递场景。通过本文的学习,我们了解了:
- 发布订阅模式的基本概念和优缺点
- Redis Pub/Sub的核心命令和使用方法
- 如何使用Python和Node.js实现发布订阅功能
- 如何结合其他Redis功能实现消息持久化
- 实际应用场景和最佳实践
在实际应用中,我们需要根据具体需求选择合适的消息传递方案。对于简单的实时通知场景,Redis Pub/Sub是一个轻量级的选择;对于需要更完善功能的场景,可以考虑使用Redis Stream或专业的消息队列系统(如RabbitMQ、Kafka等)。
通过合理使用Redis Pub/Sub,我们可以构建松耦合、实时性高的分布式系统,提高系统的可扩展性和维护性。