Skip to main content

Redis Stream MQ 使用Redis Stream 作为消息队列

Yujie LiuAbout 5 minComputer ScienceCacheMessage QueueRedis

Redis Stream MQ 使用Redis Stream 作为消息队列

Redis Stream 是 Redis 5.0 版本新增加的数据类型,Redis 专门为消息队列设计的数据类型。

在 Redis 5.0 Stream 没出来之前,消息队列的实现方式都有着各自的缺陷,例如:

  • 发布订阅模式,不能持久化也就无法可靠的保存消息,并且对于离线重连的客户端不能读取历史消息的缺陷;
  • List 实现消息队列的方式不能重复消费,一个消息消费完就会被删除,而且生产者需要自行实现全局唯一 ID。

基于以上问题,Redis 5.0 便推出了 Stream 类型也是此版本最重要的功能,用于完美地实现消息队列,它支持消息的持久化、支持自动生成全局唯一 ID、支持 ack 确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠。

简单的消息队列

  • 生产者通过 XADD 命令插入一条消息
    • 参数:队列名称,是否(部分)指定一个id,key,value
    • 返回一个时间戳,第一部分表示服务器时间(毫秒),第二部分表示这个毫秒内的第几条数据
#向YJMQ消息队列中插入key为name,值为yujie的消息
127.0.0.1:6379> XADD YJMQ * name yujie
"1701290357031-0"
#这个返回值表示1701290357031毫秒的第一条数据
  • 消费者通过XREAD命令从消息队列中读取消息
    • 参数:STREAMS默认,队列名称,从某一毫秒之后的消息
    • 也可以使用BLOCK关键字,选择阻塞一段时间获取消息(当没有消息时)。
#记住是某一毫秒之后的下一条消息,如果直接复制上面存入的id,则会返回nil因为不存在吓一跳
127.0.0.1:6379> XREAD STREAMS YJMQ 1701290357031-0
(nil)

#因此减一毫秒,获取消息
127.0.0.1:6379> XREAD STREAMS YJMQ 1701290357030-0
1) 1) "YJMQ"
   2) 1) 1) "1701290357031-0"
         2) 1) "name"
            2) "yujie"
 
#阻塞10秒获取,$表示只获取新消息,由于之前插入的那条已经被read过,不算新消息了
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS mymq $
(nil)
(10.05s)

交互流程

img

以上这些使用List也能实现,因此下面的才是新的。

带消费者组的消息队列

Stream 可以以使用 XGROUP 创建消费组,创建消费组之后,Stream 可以使用 XREADGROUP 命令让消费组内的消费者读取消息。

消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了,即同一个消费组里的消费者不能消费同一条消息。但是不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消息组的时候,不同消费组指定了相同位置开始读取消息)。

  • 创建两条从第一条消息开始读的消费者组
127.0.0.1:6379> XGROUP CREATE YJMQ group1 0-0
OK
127.0.0.1:6379> XGROUP CREATE YJMQ group2 0-0
OK
  • 从消费者组中读取尚未消费的数据
# 使用1组消费者1从YJMQ中读取数据,> 符号表示从第一条未被消费的数据读取
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 STREAMS YJMQ >
1) 1) "YJMQ"
   2) 1) 1) "1701290357031-0"
         2) 1) "name"
            2) "yujie"
            
#1组的消费者2读取为空,因为唯一一条消息已经被消费者1读取         
127.0.0.1:6379> XREADGROUP GROUP group1 consumer2 STREAMS YJMQ >
(nil)

#但是,不同组的,比如2组的消费者1,就可以从该队列中重复获得该消息(由于1组和2组都是从0-0开始)
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 STREAMS YJMQ >
1) 1) "YJMQ"
   2) 1) 1) "1701290357031-0"
         2) 1) "name"
            2) "yujie"

但是,使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。

  • Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。
#组1确认一条消息已经被消费
127.0.0.1:6379> XACK YJMQ group1 1701290357031-0
(integer) 1

#因此再想获取则获取不到消息了
127.0.0.1:6379> XREADGROUP GROUP group1 consumer2 STREAMS YJMQ >
(nil)
  • 我们可以使用XPENDING来查看组获取了哪些消息还未确认。
    • 信息:处理中消息的数量,时间范围从哪到哪,具体是哪个消费者持有多个消息
#插入两条消息
127.0.0.1:6379> XADD YJMQ * name1 yujie
"1701292195759-0"
127.0.0.1:6379> XADD YJMQ * name2 yuuuujie
"1701292210311-0"

#获取最新消息,由于这里没有用COUNT约束数量,因此两条都被组1的消费者1获取了
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 STREAMS YJMQ >
1) 1) "YJMQ"
   2) 1) 1) "1701292195759-0"
         2) 1) "name1"
            2) "yujie"
      2) 1) "1701292210311-0"
         2) 1) "name2"
            2) "yuuuujie"
#查询消息
127.0.0.1:6379> XPENDING YJMQ group1
1) (integer) 2
2) "1701292195759-0"
3) "1701292210311-0"
4) 1) 1) "consumer1"
      2) "2"
      
#确认一下
127.0.0.1:6379> XACK YJMQ group1 1701292195759-0
(integer) 1
127.0.0.1:6379> XACK YJMQ group1 1701292210311-0
(integer) 1

消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成,整个流程的执行如下图所示:

img

总结

  • 消息保序:XADD/XREAD
  • 阻塞读取:XREAD block
  • 重复消息处理:Stream 在使用 XADD 命令,会自动生成全局唯一 ID;
  • 消息可靠性:内部使用 PENDING List 自动保存消息,使用 XPENDING 命令查看消费组已经读取但是未被确认的消息,消费者使用 XACK 确认消息;
  • 支持消费组GROUP形式消费数据
Last update:
Contributors: Yujie