Redis发布订阅模式

1. 发布订阅模式基础

1.1 什么是发布订阅模式

发布订阅(Publish/Subscribe,简称Pub/Sub)是一种消息通信模式,其中消息发送者(发布者)不直接将消息发送给特定的接收者(订阅者),而是将消息分类为不同的频道(Channel),订阅者可以订阅感兴趣的频道,接收该频道的所有消息。

核心概念

  • 发布者(Publisher):消息的发送者,将消息发送到指定频道
  • 订阅者(Subscriber):消息的接收者,订阅一个或多个频道
  • 频道(Channel):消息的分类标识,发布者向频道发送消息,订阅者从频道接收消息
  • 模式(Pattern):支持通配符的频道匹配模式,订阅者可以通过模式订阅多个相关频道

1.2 发布订阅模式的优缺点

优点

  • 松耦合:发布者和订阅者之间没有直接依赖关系
  • 广播机制:一条消息可以同时发送给多个订阅者
  • 实时性:消息实时传递,适合实时通知场景
  • 灵活性:支持频道和模式订阅,满足不同的订阅需求
  • 简单易用:Redis提供了简洁的命令接口

缺点

  • 消息不持久:默认情况下,Redis Pub/Sub不持久化消息,订阅者离线期间的消息会丢失
  • 没有消息确认机制:发布者无法知道消息是否被订阅者接收和处理
  • 不支持消息重试:订阅者处理失败后,无法重新获取消息
  • 有限的消息堆积能力:消息在Redis内存中传递,不适合大量消息堆积的场景
  • 单节点故障:如果Redis节点故障,所有Pub/Sub连接都会断开

2. Redis Pub/Sub核心命令

2.1 基本命令

发布消息

  • PUBLISH channel message:向指定频道发布消息
    • channel:频道名称
    • message:消息内容
    • 返回值:接收到消息的订阅者数量

订阅频道

  • SUBSCRIBE channel [channel ...]:订阅一个或多个频道
    • channel:频道名称
    • 一旦订阅,客户端会进入订阅模式,只能接收消息和执行少数几个命令(如UNSUBSCRIBE、PSUBSCRIBE等)

取消订阅

  • UNSUBSCRIBE [channel [channel ...]]:取消订阅一个或多个频道
    • 如果不指定频道,则取消订阅所有频道

模式订阅

  • PSUBSCRIBE pattern [pattern ...]:订阅匹配指定模式的频道
    • pattern:频道模式,支持通配符 *(匹配任意字符)和 ?(匹配单个字符)

取消模式订阅

  • PUNSUBSCRIBE [pattern [pattern ...]]:取消订阅匹配指定模式的频道
    • 如果不指定模式,则取消订阅所有模式

2.2 查看订阅信息

  • PUBSUB CHANNELS [pattern]:列出当前活跃的频道(有订阅者的频道)
  • PUBSUB NUMSUB [channel [channel ...]]:查看指定频道的订阅者数量
  • PUBSUB NUMPAT:查看当前模式订阅的数量

3. 基本使用示例

3.1 命令行示例

步骤1:启动第一个Redis客户端作为订阅者

# 连接Redis
redis-cli

# 订阅频道
127.0.0.1:6379> SUBSCRIBE news:sports news:tech
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news:sports"
3) (integer) 1
1) "subscribe"
2) "news:tech"
3) (integer) 2

步骤2:启动第二个Redis客户端作为发布者

# 连接Redis
redis-cli

# 向sports频道发布消息
127.0.0.1:6379> PUBLISH news:sports "Lakers won the game!"
(integer) 1

# 向tech频道发布消息
127.0.0.1:6379> PUBLISH news:tech "New iPhone released!"
(integer) 1

# 向未订阅的频道发布消息
127.0.0.1:6379> PUBLISH news:entertainment "New movie coming soon!"
(integer) 0

步骤3:查看订阅者接收到的消息

# 在订阅者客户端可以看到以下输出
1) "message"
2) "news:sports"
3) "Lakers won the game!"
1) "message"
2) "news:tech"
3) "New iPhone released!"

3.2 模式订阅示例

步骤1:启动Redis客户端作为模式订阅者

# 连接Redis
redis-cli

# 订阅所有news开头的频道
127.0.0.1:6379> PSUBSCRIBE news:*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "news:*"
3) (integer) 1

步骤2:启动另一个Redis客户端作为发布者

# 连接Redis
redis-cli

# 向不同的news频道发布消息
127.0.0.1:6379> PUBLISH news:sports "Basketball game tomorrow"
(integer) 1

127.0.0.1:6379> PUBLISH news:tech "AI breakthrough"
(integer) 1

127.0.0.1:6379> PUBLISH news:health "Exercise tips"
(integer) 1

步骤3:查看模式订阅者接收到的消息

# 在模式订阅者客户端可以看到以下输出
1) "pmessage"
2) "news:*"
3) "news:sports"
4) "Basketball game tomorrow"
1) "pmessage"
2) "news:*"
3) "news:tech"
4) "AI breakthrough"
1) "pmessage"
2) "news:*"
3) "news:health"
4) "Exercise tips"

4. 编程语言实现

4.1 Python实现

安装依赖

pip install redis

发布者示例

import redis
import time

# 连接Redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)

def publisher():
    """消息发布者"""
    channels = ['chat:general', 'chat:tech', 'chat:sports']
    messages = [
        'Hello everyone!',
        'Redis is awesome!',
        'Did you watch the game last night?',
        'Python 3.10 has new features.',
        'The team won the championship!'
    ]
    
    for i, message in enumerate(messages):
        # 循环使用不同的频道
        channel = channels[i % len(channels)]
        # 发布消息
        result = redis_client.publish(channel, message)
        print(f"发布到频道 {channel}: {message},{result} 个订阅者接收到")
        time.sleep(1)  # 每秒发布一条消息

if __name__ == '__main__':
    publisher()

订阅者示例

import redis
import threading

def subscriber():
    """消息订阅者"""
    # 创建新的Redis连接(订阅者需要单独的连接)
    redis_client = redis.Redis(host='localhost', port=6379, db=0)
    pubsub = redis_client.pubsub()
    
    # 订阅频道
    pubsub.subscribe('chat:general', 'chat:tech')
    # 模式订阅
    pubsub.psubscribe('chat:*')
    
    print("开始接收消息...")
    
    # 监听消息
    for message in pubsub.listen():
        # 过滤掉订阅确认消息
        if message['type'] == 'message':
            print(f"接收到频道 {message['channel'].decode()} 的消息: {message['data'].decode()}")
        elif message['type'] == 'pmessage':
            print(f"通过模式 {message['pattern'].decode()} 接收到频道 {message['channel'].decode()} 的消息: {message['data'].decode()}")
        elif message['type'] in ['subscribe', 'psubscribe']:
            print(f"成功订阅 {message['channel'].decode()}")
        elif message['type'] in ['unsubscribe', 'punsubscribe']:
            print(f"取消订阅 {message['channel'].decode()}")

def unsubscribe_after_delay(pubsub, delay=10):
    """延迟取消订阅"""
    import time
    time.sleep(delay)
    print("取消订阅 chat:tech")
    pubsub.unsubscribe('chat:tech')
    time.sleep(5)
    print("取消订阅所有频道")
    pubsub.unsubscribe()
    pubsub.punsubscribe()

if __name__ == '__main__':
    # 启动订阅者线程
    sub_thread = threading.Thread(target=subscriber)
    sub_thread.daemon = True
    sub_thread.start()
    
    # 保持主线程运行
    try:
        while True:
            pass
    except KeyboardInterrupt:
        print("退出程序")

4.2 Node.js实现

安装依赖

npm install redis

发布者示例

const redis = require('redis');

// 创建Redis客户端
const publisher = redis.createClient({
  host: 'localhost',
  port: 6379
});

// 连接Redis
publisher.connect().then(() => {
  console.log('发布者连接成功');
  
  // 发布消息
  const channels = ['notifications:users', 'notifications:orders', 'notifications:system'];
  const messages = [
    JSON.stringify({ type: 'welcome', user_id: 123, message: 'Welcome to our platform!' }),
    JSON.stringify({ type: 'order_status', order_id: 456, status: 'shipped' }),
    JSON.stringify({ type: 'maintenance', message: 'System maintenance scheduled for tonight' }),
    JSON.stringify({ type: 'promotion', user_id: 123, discount: '20%' }),
    JSON.stringify({ type: 'order_status', order_id: 789, status: 'delivered' })
  ];
  
  // 定时发布消息
  let index = 0;
  const interval = setInterval(() => {
    if (index < messages.length) {
      const channel = channels[index % channels.length];
      const message = messages[index];
      
      publisher.publish(channel, message).then(count => {
        console.log(`发布到频道 ${channel}: ${message},${count} 个订阅者接收到`);
      }).catch(err => {
        console.error('发布消息失败:', err);
      });
      
      index++;
    } else {
      clearInterval(interval);
      publisher.disconnect();
      console.log('发布完成,断开连接');
    }
  }, 1000);
}).catch(err => {
  console.error('发布者连接失败:', err);
});

订阅者示例

const redis = require('redis');

// 创建Redis客户端
const subscriber = redis.createClient({
  host: 'localhost',
  port: 6379
});

// 连接Redis
subscriber.connect().then(() => {
  console.log('订阅者连接成功');
  
  // 订阅频道
  subscriber.subscribe('notifications:users', (message) => {
    console.log('接收到用户通知:', message);
  });
  
  subscriber.subscribe('notifications:orders', (message) => {
    console.log('接收到订单通知:', message);
  });
  
  // 模式订阅
  subscriber.pSubscribe('notifications:*', (message, channel) => {
    console.log(`通过模式订阅接收到 ${channel} 的消息:`, message);
  });
  
  console.log('开始接收消息...');
  
  // 10秒后取消订阅
  setTimeout(() => {
    console.log('取消订阅 notifications:orders');
    subscriber.unsubscribe('notifications:orders');
    
    // 15秒后取消所有订阅
    setTimeout(() => {
      console.log('取消所有订阅');
      subscriber.unsubscribe();
      subscriber.pUnsubscribe();
      
      // 断开连接
      setTimeout(() => {
        subscriber.disconnect();
        console.log('断开连接');
      }, 2000);
    }, 5000);
  }, 10000);
}).catch(err => {
  console.error('订阅者连接失败:', err);
});

5. 发布订阅模式的高级特性

5.1 频道命名规范

推荐的频道命名规范

  • 使用冒号 : 作为分隔符,如 service:component:action
  • 从一般到具体,如 chat:room:123
  • 使用小写字母和下划线
  • 保持频道名称简洁明了

示例

  • user:login - 用户登录事件
  • order:created - 订单创建事件
  • payment:completed - 支付完成事件
  • system:error - 系统错误事件
  • notification:user:123 - 特定用户的通知

5.2 消息格式

推荐的消息格式

  • 使用JSON格式,便于结构化数据传输
  • 包含必要的元数据,如消息类型、时间戳等
  • 保持消息大小合理,避免过大的消息

示例消息格式

{
  "type": "order_status_change",
  "timestamp": 1620000000000,
  "data": {
    "order_id": "123456",
    "status": "shipped",
    "tracking_number": "TN1234567890",
    "updated_at": "2023-05-03T10:30:00Z"
  },
  "version": "1.0"
}

5.3 消息持久化方案

由于Redis Pub/Sub默认不持久化消息,可以通过以下方式实现消息持久化:

方案1:结合List实现持久化

  • 发布者在发布消息到频道的同时,将消息存储到List中
  • 订阅者在启动时,先从List中获取历史消息,再订阅实时消息

实现示例

def publish_with_persistence(channel, message):
    """发布消息并持久化"""
    # 发布到频道
    redis_client.publish(channel, message)
    # 存储到List(设置过期时间,避免内存占用过大)
    history_key = f"history:{channel}"
    redis_client.lpush(history_key, message)
    redis_client.ltrim(history_key, 0, 999)  # 只保留最近1000条消息
    redis_client.expire(history_key, 86400)  # 24小时过期

def get_history_messages(channel, limit=100):
    """获取历史消息"""
    history_key = f"history:{channel}"
    return redis_client.lrange(history_key, 0, limit-1)

方案2:使用Redis Stream

Redis 5.0+ 引入的Stream数据结构,提供了更完善的消息队列功能,支持:

  • 消息持久化
  • 消息ID和顺序
  • 消费者组
  • 消息确认
  • 消息回溯

实现示例

def publish_to_stream(stream, message):
    """发布消息到Stream"""
    # 添加消息到Stream
    return redis_client.xadd(stream, message)

def consume_from_stream(stream, consumer_group, consumer_name, start_id='>'):
    """从Stream消费消息"""
    # 创建消费者组(如果不存在)
    try:
        redis_client.xgroup_create(stream, consumer_group, id='0', mkstream=True)
    except Exception:
        # 消费者组已存在,忽略错误
        pass
    
    # 消费消息
    while True:
        messages = redis_client.xreadgroup(
            consumer_group, consumer_name, {stream: start_id}, count=1, block=0
        )
        if messages:
            for message in messages:
                stream_name, message_list = message
                for msg_id, msg_data in message_list:
                    print(f"接收到消息 {msg_id}: {msg_data}")
                    # 处理消息
                    # 确认消息处理完成
                    redis_client.xack(stream, consumer_group, msg_id)

6. 实际应用场景

6.1 实时聊天系统

需求:实现用户之间的实时聊天功能,支持不同的聊天频道。

实现

# 发布消息(发送聊天消息)
def send_chat_message(channel, user_id, username, content):
    message = {
        'user_id': user_id,
        'username': username,
        'content': content,
        'timestamp': time.time()
    }
    redis_client.publish(f"chat:{channel}", json.dumps(message))

# 订阅消息(接收聊天消息)
def subscribe_to_chat(channel):
    pubsub = redis_client.pubsub()
    pubsub.subscribe(f"chat:{channel}")
    for message in pubsub.listen():
        if message['type'] == 'message':
            chat_message = json.loads(message['data'])
            print(f"[{chat_message['username']}] {chat_message['content']}")

6.2 系统通知

需求:实现系统级别的通知功能,如用户注册、订单状态变更、系统维护等。

实现

def send_notification(notification_type, user_id=None, data=None):
    """发送通知"""
    message = {
        'type': notification_type,
        'user_id': user_id,
        'data': data,
        'timestamp': time.time()
    }
    
    # 发送到用户特定频道(如果指定了用户)
    if user_id:
        redis_client.publish(f"notification:user:{user_id}", json.dumps(message))
    
    # 发送到类型频道
    redis_client.publish(f"notification:type:{notification_type}", json.dumps(message))
    
    # 发送到所有通知频道
    redis_client.publish("notification:all", json.dumps(message))

# 示例:发送订单状态变更通知
send_notification(
    'order_status',
    user_id=123,
    data={'order_id': 456, 'status': 'shipped', 'tracking_number': 'TN123456'}
)

6.3 实时监控和告警

需求:实现系统监控和告警功能,当系统指标超过阈值时,及时通知相关人员。

实现

def monitor_system_metrics():
    """监控系统指标"""
    while True:
        # 模拟获取系统指标
        cpu_usage = random.uniform(0, 100)
        memory_usage = random.uniform(0, 100)
        disk_usage = random.uniform(0, 100)
        
        # 发布指标数据
        metrics = {
            'cpu': cpu_usage,
            'memory': memory_usage,
            'disk': disk_usage,
            'timestamp': time.time()
        }
        redis_client.publish('metrics:system', json.dumps(metrics))
        
        # 检查是否需要告警
        if cpu_usage > 90:
            alert = {
                'level': 'critical',
                'message': f'CPU usage is too high: {cpu_usage:.2f}%',
                'timestamp': time.time()
            }
            redis_client.publish('alert:system', json.dumps(alert))
        
        time.sleep(5)  # 每5秒检查一次

# 告警订阅者
def alert_subscriber():
    pubsub = redis_client.pubsub()
    pubsub.subscribe('alert:system')
    for message in pubsub.listen():
        if message['type'] == 'message':
            alert = json.loads(message['data'])
            print(f"【{alert['level'].upper()}】{alert['message']}")
            # 可以集成邮件、短信等通知方式

6.4 分布式系统事件总线

需求:在分布式系统中,不同服务之间通过事件总线进行通信。

实现

class EventBus:
    """分布式事件总线"""
    def __init__(self, redis_client):
        self.redis_client = redis_client
    
    def publish(self, event_type, data):
        """发布事件"""
        event = {
            'event_type': event_type,
            'data': data,
            'timestamp': time.time(),
            'event_id': str(uuid.uuid4())
        }
        channel = f"event:{event_type}"
        result = self.redis_client.publish(channel, json.dumps(event))
        return {
            'event_id': event['event_id'],
            'channel': channel,
            'subscribers_count': result
        }
    
    def subscribe(self, event_type, callback):
        """订阅事件"""
        def _subscribe_handler():
            pubsub = self.redis_client.pubsub()
            pubsub.subscribe(f"event:{event_type}")
            for message in pubsub.listen():
                if message['type'] == 'message':
                    event = json.loads(message['data'])
                    callback(event)
        
        # 启动订阅线程
        thread = threading.Thread(target=_subscribe_handler)
        thread.daemon = True
        thread.start()
        return thread

# 使用示例
event_bus = EventBus(redis_client)

# 订阅用户注册事件
def handle_user_registered(event):
    print(f"用户注册事件: {event['data']}")
    # 处理用户注册后的逻辑,如发送欢迎邮件、初始化用户数据等

event_bus.subscribe('user.registered', handle_user_registered)

# 发布用户注册事件
event_bus.publish('user.registered', {
    'user_id': 123,
    'username': 'john_doe',
    'email': 'john@example.com'
})

7. 最佳实践

7.1 性能优化

  • 合理使用频道:避免创建过多频道,建议根据业务逻辑合理分类
  • 消息大小控制:消息内容不宜过大,建议控制在1KB以内
  • 使用管道:发布多条消息时,使用管道(Pipeline)减少网络往返时间
  • 连接管理:订阅者应使用单独的Redis连接,避免与其他操作混用
  • 模式订阅优化:避免使用过于宽泛的模式(如 *),减少不必要的消息传递

7.2 可靠性保障

  • 考虑使用Redis Stream:对于需要消息持久化和确认的场景,推荐使用Stream
  • 实现消息重试机制:关键消息可以结合数据库或其他持久化方式,实现重试逻辑
  • 监控订阅状态:定期检查订阅连接状态,确保订阅正常
  • 使用Redis Sentinel或Cluster:提高Redis服务的可用性,减少因Redis故障导致的Pub/Sub中断

7.3 安全性考虑

  • 限制Redis的网络访问:只允许必要的IP访问Redis服务
  • 设置密码认证:防止未授权访问
  • 使用ACL:限制Pub/Sub相关命令的权限
  • 消息内容加密:对于敏感消息,考虑在发送前加密
  • 避免在消息中存储敏感信息:如密码、令牌等

7.4 适用场景判断

适合使用Redis Pub/Sub的场景

  • 实时通知和告警
  • 聊天和消息系统
  • 系统事件广播
  • 监控数据实时传输
  • 微服务之间的轻量级通信

不适合的场景

  • 需要消息持久化的场景
  • 对消息可靠性要求极高的场景
  • 消息量极大的场景
  • 需要复杂的消息路由和处理逻辑的场景

8. 总结

Redis Pub/Sub提供了一种简单高效的发布订阅机制,适合实时消息传递场景。通过本文的学习,我们了解了:

  • 发布订阅模式的基本概念和优缺点
  • Redis Pub/Sub的核心命令和使用方法
  • 如何使用Python和Node.js实现发布订阅功能
  • 如何结合其他Redis功能实现消息持久化
  • 实际应用场景和最佳实践

在实际应用中,我们需要根据具体需求选择合适的消息传递方案。对于简单的实时通知场景,Redis Pub/Sub是一个轻量级的选择;对于需要更完善功能的场景,可以考虑使用Redis Stream或专业的消息队列系统(如RabbitMQ、Kafka等)。

通过合理使用Redis Pub/Sub,我们可以构建松耦合、实时性高的分布式系统,提高系统的可扩展性和维护性。

« 上一篇 Redis作为消息队列的应用 下一篇 » Redis实时分析应用