现在的互联网应用基本上都是采用分布式系统架构进行设计的,而很多分布式系统必备的一个基础软件就是消息队列。
消息队列要能支持组件通信消息的快速读写,而 Redis 本身支持数据的高速访问,正好可以满足消息队列的读写性能需求。不过,除了性能,消息队列还有其他的要求,所以,很多人都很关心一个问题:“Redis 适合做消息队列吗?”
其实,这个问题的背后,隐含着两方面的核心问题:
- 消息队列的消息存取需求是什么?
- Redis 如何实现消息队列的需求?
只有把这两方面的知识和实践经验串连起来,才能彻底理解基于 Redis 实现消息队列的技术实践。以后需要为分布式系统组件做消息队列选型时,就可以根据组件通信量和消息通信速度的要求,选择出适合的 Redis 消息队列方案。目前使用Redis做消息队列可以有如下三种方式。
一、消息队列的消息存取需求
在分布式系统中,当两个组件要基于消息队列进行通信时,一个组件会把要处理的数据以消息的形式传递给消息队列,然后,这个组件就可以继续执行其他操作了;远端的另一个组件从消息队列中把消息读取出来,再在本地进行处理。
假设组件 1 需要对采集到的数据进行求和计算,并写入数据库,但是,消息到达的速度很快,组件 1 没有办法及时地既做采集,又做计算,并且写入数据库。所以,我们可以使用基于消息队列的通信,让组件 1 把数据 x 和 y 保存为 JSON 格式的消息,再发到消息队列,这样它就可以继续接收新的数据了。组件 2 则异步地从消息队列中把数据读取出来,在服务器 2 上进行求和计算后,再写入数据库。这个过程如下图所示:
我们一般把消息队列中发送消息的组件称为生产者(例子中的组件 1),把接收消息的组件称为消费者(例子中的组件 2),下图展示了一个通用的消息队列的架构模型:
在使用消息队列时,消费者可以异步读取生产者消息,然后再进行处理。这样一来,即使生产者发送消息的速度远远超过了消费者处理消息的速度,生产者已经发送的消息也可以缓存在消息队列中,避免阻塞生产者,这是消息队列作为分布式组件通信的一大优势。
不过,消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证消息可靠性。
需求一:消息保序
虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。对于要求消息保序的场景来说,一旦出现这种消息被乱序处理的情况,就可能会导致业务逻辑被错误执行,从而给业务方造成损失。
需求二:重复消息处理
消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。此时,消费者可能会收到多条重复的消息。对于重复的消息,消费者如果多次处理的话,就可能造成一个业务逻辑被多次执行,如果业务逻辑正好是要修改数据,那就会出现数据被多次修改的问题了。
需求三:消息可靠性保证
消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。
二、基于 List 的消息队列解决方案
首先,我们先从最简单的场景开始讲起。
如果你的业务需求足够简单,想把 Redis 当作队列来使用,肯定最先想到的就是使用 List 这个数据类型。
因为 List 底层的实现就是一个「链表」,在头部和尾部操作元素,时间复杂度都是 O(1),这意味着它非常符合消息队列的模型。
如果把 List 当作队列,你可以这么来用。
生产者使用 LPUSH 发布消息:
1 | 127.0.0.1:6379> LPUSH queue msg1 |
消费者这一侧,使用 RPOP 拉取消息:
1 | 127.0.0.1:6379> RPOP queue |
这个模型非常简单,也很容易理解。
但这里有个小问题,当队列中已经没有消息了,消费者在执行 RPOP 时,会返回 NULL。
1 | 127.0.0.1:6379> RPOP queue |
而我们在编写消费者逻辑时,一般是一个「死循环」,这个逻辑需要不断地从队列中拉取消息进行处理,伪代码一般会这么写:
1 | while true: |
如果此时队列为空,那消费者依旧会频繁拉取消息,这会造成「CPU 空转」,不仅浪费 CPU 资源,还会对 Redis 造成压力。
怎么解决这个问题呢?
也很简单,当队列为空时,我们可以「休眠」一会,再去尝试拉取消息。代码可以修改成这样:
1 | while true: |
这就解决了 CPU 空转问题。
这个问题虽然解决了,但又带来另外一个问题:当消费者在休眠等待时,有新消息来了,那消费者处理新消息就会存在「延迟」。
假设设置的休眠时间是 2s,那新消息最多存在 2s 的延迟。
要想缩短这个延迟,只能减小休眠的时间。但休眠时间越小,又有可能引发 CPU 空转问题。
鱼和熊掌不可兼得。
那如何做,既能及时处理新消息,还能避免 CPU 空转呢?
Redis 是否存在这样一种机制:如果队列为空,消费者在拉取消息时就「阻塞等待」,一旦有新消息过来,就通知我的消费者立即处理新消息呢?
幸运的是,Redis 确实提供了「阻塞式」拉取消息的命令:BRPOP / BLPOP,这里的 B 指的是阻塞(Block)。
现在,你可以这样来拉取消息了:
1 | while true: |
使用 BRPOP 这种阻塞式方式拉取消息时,还支持传入一个「超时时间」,如果设置为 0,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 NULL。
这个方案不错,既兼顾了效率,还避免了 CPU 空转问题,一举两得。
使用 BRPOP 这种阻塞式方式拉取消息时,还支持传入一个「超时时间」,如果设置为 0,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 NULL。
这个方案不错,既兼顾了效率,还避免了 CPU 空转问题,一举两得。
解决了消息处理不及时的问题,你可以再思考一下,这种队列模型,有什么缺点?
我们一起来分析一下:
- 不支持重复消费:消费者拉取消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费,即不支持多个消费者消费同一批数据
- 消息丢失:消费者拉取到消息后,如果发生异常宕机,那这条消息就丢失了
第一个问题是功能上的,使用 List 做消息队列,它仅仅支持最简单的,一组生产者对应一组消费者,不能满足多组生产者和消费者的业务场景。
第二个问题就比较棘手了,因为从 List 中 POP 一条消息出来后,这条消息就会立即从链表中删除了。也就是说,无论消费者是否处理成功,这条消息都没办法再次消费了。
这也意味着,如果消费者在处理消息时异常宕机,那这条消息就相当于丢失了。
针对这 2 个问题怎么解决呢?我们一个个来看。
三、基于 Pub/Sub 的消息队列解决方案
从名字就能看出来,这个模块是 Redis 专门是针对「发布/订阅」这种队列模型设计的。
它正好可以解决前面提到的第一个问题:重复消费。
即多组生产者、消费者的场景,我们来看它是如何做的。
Redis 提供了 PUBLISH / SUBSCRIBE 命令,来完成发布、订阅的操作。
假设你想开启 2 个消费者,同时消费同一批数据,就可以按照以下方式来实现。
首先,使用 SUBSCRIBE 命令,启动 2 个消费者,并「订阅」同一个队列。
1 | # 2个消费者 都订阅一个队列 |
此时,2 个消费者都会被阻塞住,等待新消息的到来。
之后,再启动一个生产者,发布一条消息。
1 | 127.0.0.1:6379> PUBLISH queue msg1 |
这时,2 个消费者就会解除阻塞,收到生产者发来的新消息。
1 | 127.0.0.1:6379> SUBSCRIBE queue |
使用 Pub/Sub 这种方案,既支持阻塞式拉取消息,还很好地满足了多组消费者,消费同一批数据的业务需求。
除此之外,Pub/Sub 还提供了「匹配订阅」模式,允许消费者根据一定规则,订阅「多个」自己感兴趣的队列。
1 | # 订阅符合规则的队列 |
这里的消费者,订阅了 queue.* 相关的队列消息。
之后,生产者分别向 queue.p1 和 queue.p2 发布消息。
1 | 127.0.0.1:6379> PUBLISH queue.p1 msg1 |
这时再看消费者,它就可以接收到这 2 个生产者的消息了。
1 | 127.0.0.1:6379> PSUBSCRIBE queue.* |
我们可以看到,Pub/Sub 最大的优势就是,支持多组生产者、消费者处理消息。
讲完了它的优点,那它有什么缺点呢?
其实,Pub/Sub 最大问题是:丢数据。
如果发生以下场景,就有可能导致数据丢失:
- 消费者下线
- Redis 宕机
- 消息堆积
究竟是怎么回事?
这其实与 Pub/Sub 的实现方式有很大关系。
Pub/Sub 在实现时非常简单,它没有基于任何数据类型,也没有做任何的数据存储,它只是单纯地为生产者、消费者建立「数据转发通道」,把符合规则的数据,从一端转发到另一端。
一个完整的发布、订阅消息处理流程是这样的:
- 消费者订阅指定队列,Redis 就会记录一个映射关系:队列->消费者
- 生产者向这个队列发布消息,那 Redis 就从映射关系中找出对应的消费者,把消息转发给它
整个过程中,没有任何的数据存储,一切都是实时转发的。
这种设计方案,就导致了上面提到的那些问题。
例如,如果一个消费者异常挂掉了,它再重新上线后,只能接收新的消息,在下线期间生产者发布的消息,因为找不到消费者,都会被丢弃掉。
如果所有消费者都下线了,那生产者发布的消息,因为找不到任何一个消费者,也会全部「丢弃」。
所以,当你在使用 Pub/Sub 时,一定要注意:消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失。
这也是前面讲例子时,我们让消费者先订阅队列,之后才让生产者发布消息的原因。
另外,因为 Pub/Sub 没有基于任何数据类型实现,所以它也不具备「数据持久化」的能力。
也就是说,Pub/Sub 的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,Pub/Sub 的数据也会全部丢失。
最后,我们来看 Pub/Sub 在处理「消息积压」时,为什么也会丢数据?
当消费者的速度,跟不上生产者时,就会导致数据积压的情况发生。
如果采用 List 当作队列,消息积压时,会导致这个链表很长,最直接的影响就是,Redis 内存会持续增长,直到消费者把所有数据都从链表中取出。
但 Pub/Sub 的处理方式却不一样,当消息积压时,有可能会导致消费失败和消息丢失!
这是怎么回事?
还是回到 Pub/Sub 的实现细节上来说。
每个消费者订阅一个队列时,Redis 都会在 Server 上给这个消费者在分配一个「缓冲区」,这个缓冲区其实就是一块内存。
当生产者发布消息时,Redis 先把消息写到对应消费者的缓冲区中。
之后,消费者不断地从缓冲区读取消息,处理消息。看到了么,整个过程中,没有任何的数据存储,一切都是实时转发的。
这种设计方案,就导致了上面提到的那些问题。
例如,如果一个消费者异常挂掉了,它再重新上线后,只能接收新的消息,在下线期间生产者发布的消息,因为找不到消费者,都会被丢弃掉。
如果所有消费者都下线了,那生产者发布的消息,因为找不到任何一个消费者,也会全部「丢弃」。
所以,当你在使用 Pub/Sub 时,一定要注意:消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失。
这也是前面讲例子时,我们让消费者先订阅队列,之后才让生产者发布消息的原因。
另外,因为 Pub/Sub 没有基于任何数据类型实现,所以它也不具备「数据持久化」的能力。
也就是说,Pub/Sub 的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,Pub/Sub 的数据也会全部丢失。
最后,我们来看 Pub/Sub 在处理「消息积压」时,为什么也会丢数据?
当消费者的速度,跟不上生产者时,就会导致数据积压的情况发生。
如果采用 List 当作队列,消息积压时,会导致这个链表很长,最直接的影响就是,Redis 内存会持续增长,直到消费者把所有数据都从链表中取出。
但 Pub/Sub 的处理方式却不一样,当消息积压时,有可能会导致消费失败和消息丢失!
这是怎么回事?
还是回到 Pub/Sub 的实现细节上来说。
每个消费者订阅一个队列时,Redis 都会在 Server 上给这个消费者在分配一个「缓冲区」,这个缓冲区其实就是一块内存。
当生产者发布消息时,Redis 先把消息写到对应消费者的缓冲区中。
之后,消费者不断地从缓冲区读取消息,处理消息。看到了么,整个过程中,没有任何的数据存储,一切都是实时转发的。
这种设计方案,就导致了上面提到的那些问题。
例如,如果一个消费者异常挂掉了,它再重新上线后,只能接收新的消息,在下线期间生产者发布的消息,因为找不到消费者,都会被丢弃掉。
如果所有消费者都下线了,那生产者发布的消息,因为找不到任何一个消费者,也会全部「丢弃」。
所以,当你在使用 Pub/Sub 时,一定要注意:消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失。
这也是前面讲例子时,我们让消费者先订阅队列,之后才让生产者发布消息的原因。
另外,因为 Pub/Sub 没有基于任何数据类型实现,所以它也不具备「数据持久化」的能力。
也就是说,Pub/Sub 的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,Pub/Sub 的数据也会全部丢失。
最后,我们来看 Pub/Sub 在处理「消息积压」时,为什么也会丢数据?
当消费者的速度,跟不上生产者时,就会导致数据积压的情况发生。
如果采用 List 当作队列,消息积压时,会导致这个链表很长,最直接的影响就是,Redis 内存会持续增长,直到消费者把所有数据都从链表中取出。
但 Pub/Sub 的处理方式却不一样,当消息积压时,有可能会导致消费失败和消息丢失!
这是怎么回事?
还是回到 Pub/Sub 的实现细节上来说。
每个消费者订阅一个队列时,Redis 都会在 Server 上给这个消费者在分配一个「缓冲区」,这个缓冲区其实就是一块内存。
当生产者发布消息时,Redis 先把消息写到对应消费者的缓冲区中。
之后,消费者不断地从缓冲区读取消息,处理消息。
但是,问题就出在这个缓冲区上。
因为这个缓冲区其实是有「上限」的(可配置),如果消费者拉取消息很慢,就会造成生产者发布到缓冲区的消息开始积压,缓冲区内存持续增长。
如果超过了缓冲区配置的上限,此时,Redis 就会「强制」把这个消费者踢下线。
这时消费者就会消费失败,也会丢失数据。
如果你有看过 Redis 的配置文件,可以看到这个缓冲区的默认配置:client-output-buffer-limit pubsub 32mb 8mb 60。
它的参数含义如下:
- 32mb:缓冲区一旦超过 32MB,Redis 直接强制把消费者踢下线
- 8mb + 60:缓冲区超过 8MB,并且持续 60 秒,Redis 也会把消费者踢下线
Pub/Sub 的这一点特点,是与 List 作队列差异比较大的。
从这里你应该可以看出,List 其实是属于「拉」模型,而 Pub/Sub 其实属于「推」模型。
List 中的数据可以一直积压在内存中,消费者什么时候来「拉」都可以。
但 Pub/Sub 是把消息先「推」到消费者在 Redis Server 上的缓冲区中,然后等消费者再来取。
当生产、消费速度不匹配时,就会导致缓冲区的内存开始膨胀,Redis 为了控制缓冲区的上限,所以就有了上面讲到的,强制把消费者踢下线的机制。
好了,现在我们总结一下 Pub/Sub 的优缺点:
- 支持发布 / 订阅,支持多组生产者、消费者处理消息
- 消费者下线,数据会丢失
- 不支持数据持久化,Redis 宕机,数据也会丢失
- 消息堆积,缓冲区溢出,消费者会被强制踢下线,数据也会丢失
有没有发现,除了第一个是优点之外,剩下的都是缺点。
所以,很多人看到 Pub/Sub 的特点后,觉得这个功能很「鸡肋」。
也正是以上原因,Pub/Sub 在实际的应用场景中用得并不多。
目前只有哨兵集群和 Redis 实例通信时,采用了 Pub/Sub 的方案,因为哨兵正好符合即时通讯的业务场景。
我们再来看一下,Pub/Sub 有没有解决,消息处理时异常宕机,无法再次消费的问题呢?
其实也不行,Pub/Sub 从缓冲区取走数据之后,数据就从 Redis 缓冲区删除了,消费者发生异常,自然也无法再次重新消费。
好,现在我们重新梳理一下,我们在使用消息队列时的需求。
当我们在使用一个消息队列时,希望它的功能如下:
- 支持阻塞等待拉取消息
- 支持发布 / 订阅模式
- 消费失败,可重新消费,消息不丢失
- 实例宕机,消息不丢失,数据可持久化
- 消息可堆积
Redis 除了 List 和 Pub/Sub 之外,还有符合这些要求的数据类型吗?
其实,Redis 的作者也看到了以上这些问题,也一直在朝着这些方向努力着。
Redis 作者在开发 Redis 期间,还另外开发了一个开源项目 disque。
这个项目的定位,就是一个基于内存的分布式消息队列中间件。
但由于种种原因,这个项目一直不温不火。
终于,在 Redis 5.0 版本,作者把 disque 功能移植到了 Redis 中,并给它定义了一个新的数据类型:Streams。
下面我们就来看看,它能符合上面提到的这些要求吗?
四、基于 Streams 的消息队列解决方案
Streams 是 Redis 专门为消息队列设计的数据类型,我们来看 Stream 是如何解决上面这些问题的。
我们依旧从简单到复杂,依次来看 Stream 在做消息队列时,是如何处理的?
1. Streams 是如何实现消息队列需求的
首先,Stream 通过 XADD 和 XREAD 完成最简单的生产、消费模型:
下面我们来看,针对前面提到的消息队列要求,Stream 都是如何解决的?
- Streams 通过 XADD 和 XREAD 完成最简单的生产、消费模型。
- Streams 通过BLOCK 参数支持「阻塞式」拉取消息
- Stream 通过XGROUP和XREADGROUP支持发布 / 订阅模式
- Stream 通过XACK标记消息为「处理完成」
- Stream 数据会写入到 RDB 和 AOF 做持久化
- Stream 处理消息堆积方式,采用丢弃消息的方式
- 在发布消息时,可以指定队列的最大长度,防止队列积压导致内存爆炸。当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。
XADD mystream MAXLEN 10000 * repo 1
- 在发布消息时,可以指定队列的最大长度,防止队列积压导致内存爆炸。当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。
2. Streams与专业的消息队列对比
其实,一个专业的消息队列,必须要做到两大块:
- 消息不丢
- 消息可堆积
前面我们讨论的重点,很大篇幅围绕的是第一点展开的。
这里我们换个角度,从一个消息队列的「使用模型」来分析一下,怎么做,才能保证数据不丢?
使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者。
消息是否会发生丢失,其重点也就在于以下 3 个环节:
- 生产者会不会丢消息?
- 消费者会不会丢消息?
- 队列中间件会不会丢消息?
1) 生产者会不会丢消息?
当生产者在发布消息时,可能发生以下异常情况:
- 消息没发出去:网络故障或其它问题导致发布失败,中间件直接返回失败
- 不确定是否发布成功:网络问题导致发布超时,可能数据已发送成功,但读取响应结果超时了
如果是情况 1,消息根本没发出去,那么重新发一次就好了。
如果是情况 2,生产者没办法知道消息到底有没有发成功?所以,为了避免消息丢失,它也只能继续重试,直到发布成功为止。
生产者一般会设定一个最大重试次数,超过上限依旧失败,需要记录日志报警处理。
也就是说,生产者为了避免消息丢失,只能采用失败重试的方式来处理。
但发现没有?这也意味着消息可能会重复发送。
是的,在使用消息队列时,要保证消息不丢,宁可重发,也不能丢弃。
那消费者这边,就需要多做一些逻辑了。
对于敏感业务,当消费者收到重复数据数据时,要设计幂等逻辑,保证业务的正确性。
从这个角度来看,生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。
所以,无论是 Redis 还是专业的队列中间件,生产者在这一点上都是可以保证消息不丢的。
2) 消费者会不会丢消息?
这种情况就是我们前面提到的,消费者拿到消息后,还没处理完成,就异常宕机了,那消费者还能否重新消费失败的消息?
要解决这个问题,消费者在处理完消息后,必须「告知」队列中间件,队列中间件才会把标记已处理,否则仍旧把这些数据发给消费者。
这种方案需要消费者和中间件互相配合,才能保证消费者这一侧的消息不丢。
无论是 Redis 的 Stream,还是专业的队列中间件,例如 RabbitMQ、Kafka,其实都是这么做的。
所以,从这个角度来看,Redis 也是合格的。
3) 队列中间件会不会丢消息?
前面 2 个问题都比较好处理,只要客户端和服务端配合好,就能保证生产端、消费端都不丢消息。
但是,如果队列中间件本身就不可靠呢?
毕竟生产者和消费这都依赖它,如果它不可靠,那么生产者和消费者无论怎么做,都无法保证数据不丢。
在这个方面,Redis 其实没有达到要求。
Redis 在以下 2 个场景下,都会导致数据丢失。
- AOF 持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能
- 主从复制也是异步的,主从切换时,也存在丢失数据的可能(从库还未同步完成主库发来的数据,就被提成主库)
基于以上原因我们可以看到,Redis 本身的无法保证严格的数据完整性。
所以,如果把 Redis 当做消息队列,在这方面是有可能导致数据丢失的。
再来看那些专业的消息队列中间件是如何解决这个问题的?
像 RabbitMQ 或 Kafka 这类专业的队列中间件,在使用时,一般是部署一个集群,生产者在发布消息时,队列中间件通常会写「多个节点」,以此保证消息的完整性。这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。
也正因为如此,RabbitMQ、Kafka在设计时也更复杂。毕竟,它们是专门针对队列场景设计的。
但 Redis 的定位则不同,它的定位更多是当作缓存来用,它们两者在这个方面肯定是存在差异的。
最后,我们来看消息积压怎么办?
4) 消息积压怎么办?
因为 Redis 的数据都存储在内存中,这就意味着一旦发生消息积压,则会导致 Redis 的内存持续增长,如果超过机器内存上限,就会面临被 OOM 的风险。
所以,Redis 的 Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。
但 Kafka、RabbitMQ 这类消息队列就不一样了,它们的数据都会存储在磁盘上,磁盘的成本要比内存小得多,当消息积压时,无非就是多占用一些磁盘空间,相比于内存,在面对积压时也会更加「坦然」。
综上,我们可以看到,把 Redis 当作队列来使用时,始终面临的 2 个问题:
- Redis 本身可能会丢数据
- 面对消息积压,Redis 内存资源紧张
到这里,Redis 是否可以用作队列,我想这个答案你应该会比较清晰了。
如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。
而且,Redis 相比于 Kafka、RabbitMQ,部署和运维也更加轻量。
如果你的业务场景对于数据丢失非常敏感,而且写入量非常大,消息积压时会占用很多的机器资源,那么我建议你使用专业的消息队列中间件。
3. Streams操作命令简介
Streams提供了丰富的消息队列操作命令。
- XADD:插入消息,保证有序,可以自动生成全局唯一 ID。
- XREAD:用于读取消息,可以按 ID 读取数据。
- XGROUP CREATE:创建消费组。
- XREADGROUP:按消费组形式读取消息。
- XPENDING 和 XACK:XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,而 XACK 命令用于向消息队列确认消息处理已完成。
1. XADD
语法格式:
XADD key ID field value [field value ...]
- key:队列名称,如果不存在就创建
- ID:消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
- field value [field value …],key-value类型数据
XADD 命令可以往消息队列中插入新消息,消息的格式是键 - 值对形式。对于插入的每一条消息,Streams 可以自动为其生成一个全局唯一的 ID。我们执行下面的命令,就可以往名称为 mqstream 的消息队列中插入一条消息。
下面命令组成如下:
- 消息的键是 repo
- 值是 5。
- 消息队列名称后面的*,表示让 Redis 为插入的数据自动生成一个全局唯一的 ID。可以看到,消息的全局唯一 ID 由两部分组成
1631010568190-0
。- 1631010568190:数据插入时,以毫秒为单位计算的当前服务器时间
- 0:表示插入消息在当前毫秒内的消息序号,这是从 0 开始编号的。
1 | # *表示让Redis自动生成消息ID |
2. XREAD
语法格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
- [COUNT count]:用来获取消息的数量
- [BLOCK milliseconds]:用来设置阻塞模式和阻塞超时时间,默认为非阻塞
- id [id …]:用来设置读取的起始 ID,相当于 where id > $id,阻塞模式中可以使用 $ 来获取最新的消息 ID,非阻塞模式下无意义。
- key:队列名
当消费者需要读取消息时,可以直接使用 XREAD 命令从消息队列中读取。
XREAD 在读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取。
例如,我们可以执行下面的命令,从 ID 号为 1631010542237-0 的消息开始,读取后续的所有消息(示例中一共 3 条)。参数说明如下:
- block:当消息队列中没有消息时,一旦设置了 BLOCK 配置项,XREAD 就会阻塞,阻塞的时长可以在 BLOCK 配置项进行设置。BLOCK 0 表示阻塞等待,不设置超时时间。
1 | 127.0.0.1:6379> XREAD BLOCK 0 STREAMS mqstream 1631010542237-0 |
看下面命令,其中,命令最后的“$”符号表示读取最新的消息,同时,我们设置了 block 10000 的配置项,10000 的单位是毫秒,表明 XREAD 在读取最新消息时,如果没有消息到来,XREAD 将阻塞 10000 毫秒(即 10 秒),然后再返回。下面命令中的 XREAD 执行后,消息队列 mqstream 中一直没有消息,所以,XREAD 在 10 秒后返回空值(nil)。
1 | 127.0.0.1:6379> XREAD BLOCK 10000 STREAMS mqstream $ |
3. XGROUP CREATE
语法格式:
1
2 > XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupname consumername]
>
[CREATE key groupname id-or-$]
:在指定的 key 中创建分组,并且指定分组读取消息的起点,如果指定了0,分组将可以读取指定 key 的所有历史消息,如果指定了 $,分组将可以读取指定 key 的新消息,将不能读取历史消息。也可以指定任意的开始 ID。[SETID key groupname id-or-$]
:重新给已存在的分组设置消息读取的起点。例如将起点设置为 0就可以重新读取所有的历史消息[DESTROY key groupname]
:销毁指定 key 中的一个分组[CREATECONSUMER key groupname consumername]
:在指定的 key 和指定的分组中创建一个消费者。当某个命令提及了新的消费者名称时,也会自动创建新的消费者。[DELCONSUMER key groupname consumername]
:在指定的 key 和指定的分组中销毁一个消费者。$
:表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。
执行下面的命令,
XGROUP create mqstream group1 0
:创建一个名为 group1 的消费组,这个消费组消费的消息队列是 mqstream。XREADGROUP group group1 consumer1 streams mqstream >
:让 group1 消费组里的消费者 consumer1 从 mqstream 中读取所有消息,其中,命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。因为在 consumer1 读取消息前,group1 中没有其他消费者读取过消息,所以,consumer1 就得到 mqstream 消息队列中的所有消息了(一共 6 条)。XREADGROUP group group1 consumer1 streams mqstream >
:再次执行上述2命令查看返回效果XREADGROUP group group1 consumer2 streams mqstream 0
:需要注意的是,消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了。比如说,我们执行完刚才的 XREADGROUP 命令后,再执行下面的命令,让 group1 内的 consumer2 读取消息时,consumer2 读到的就是空值,因为消息已经被 consumer1 读取完了.XREADGROUP group group2 consumer1 count 1 streams mqstream >
:使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。
1 | # 1 |
4. XREADGROUP GROUP
使用 XREADGROUP GROUP 读取消费组中的消息
语法格式:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- GROUP:固定
- group:消费组名
- consumer:消费者名
- [COUNT count]:每次获取消息的数量
- [BLOCK milliseconds]:阻塞模式和超时时间
- [NOACK]:不需要确认消息,适用于不怎么重要的可以丢失的消息
- STREAMS:固定
- key [key …]:指定的队列名
- ID [ID …]:指定的消息 ID,> 指定读取所有未消费的消息,其他值指定被挂起的消息
执行如下命令:
XREADGROUP group group1 consumer1 streams mqstream >
:让 group1 消费组里的消费者 consumer1 从 mqstream 中读取所有消息,其中,命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。因为在 consumer1 读取消息前,group1 中没有其他消费者读取过消息,所以,consumer1 就得到 mqstream 消息队列中的所有消息了(一共 6 条)。XREADGROUP group group1 consumer1 streams mqstream >
:再次执行上述2命令查看返回效果XREADGROUP group group1 consumer2 streams mqstream 0
:需要注意的是,消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了。比如说,我们执行完刚才的 XREADGROUP 命令后,再执行下面的命令,让 group1 内的 consumer2 读取消息时,consumer2 读到的就是空值,因为消息已经被 consumer1 读取完了.XREADGROUP group group2 consumer1 count 1 streams mqstream >
:使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。
1 | # 1 |
5. XPENDING和XACK
语法格式:
XPENDING key group [start end count] [consumer]
- key,指定的 key
- group,指定的分组
- [start end count],起始 ID 和结束 ID 还有数量
- consumer,消费者名字
为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。
执行如下命令:
- 查看 group2 中各个消费者已读取、但尚未确认的消息个数。其中,XPENDING 返回结果的第二、三行分别表示 group2 中所有消费者读取的消息最小 ID 和最大 ID。
- 查看某个消费者具体读取了哪些数据
- 通过2步骤可以看到,consumer2 已读取的消息的 ID 是 1599274912765-0。一旦消息 1599274912765-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息。
1 | # 1 |
五、总结
前面介绍了 List、Pub/Sub、Stream 在做队列的使用方式
之后又把 Redis 和专业的消息队列中间件做对比,发现 Redis 的不足之处。
最后,我们得出 Redis 做队列的合适场景。
这里我也列了一个表格,总结了它们各自的优缺点。
六、后记
最后,再聊一聊关于「技术方案选型」的问题。
我们在分析 Redis 细节时,一直在提出问题,然后寻找更好的解决方案,在文章最后,又聊到一个专业的消息队列应该怎么做。
其实,我们在讨论技术选型时,就是一个关于如何取舍的问题。
而这里我想传达给你的信息是,在面对技术选型时,不要不经过思考就觉得哪个方案好,哪个方案不好。
你需要根据具体场景具体分析,这里我把这个分析过程分为 2 个层面:
- 业务功能角度
- 技术资源角度
这篇文章所讲到的内容,都是以业务功能角度出发做决策的。
但这里的第二点,从技术资源角度出发,其实也很重要。
技术资源的角度是说,你所处的公司环境、技术资源能否匹配这些技术方案。
这个怎么解释呢?
简单来讲,就是你所在的公司、团队,是否有匹配的资源能 hold 住这些技术方案。
我们都知道 Kafka、RabbitMQ 是非常专业的消息中间件,但它们的部署和运维,相比于 Redis 来说,也会更复杂一些。
如果你在一个大公司,公司本身就有优秀的运维团队,那么使用这些中间件肯定没问题,因为有足够优秀的人能 hold 住这些中间件,公司也会投入人力和时间在这个方向上。
但如果你是在一个初创公司,业务正处在快速发展期,暂时没有能 hold 住这些中间件的团队和人,如果贸然使用这些组件,当发生故障时,排查问题也会变得很困难,甚至会阻碍业务的发展。
而这种情形下,如果公司的技术人员对于 Redis 都很熟,综合评估来看,Redis 也基本可以满足业务 90% 的需求,那当下选择 Redis 未必不是一个好的决策。
所以,做技术选型不只是技术问题,还与人、团队、管理、组织结构有关。
也正是因为这些原因,当你在和别人讨论技术选型问题时,你会发现每个公司的做法都不相同。
毕竟每个公司所处的环境和文化不一样,做出的决策当然就会各有差异。
如果你不了解这其中的逻辑,那在做技术选型时,只会趋于表面现象,无法深入到问题根源。
而一旦你理解了这个逻辑,那么你在看待这个问题时,不仅对于技术会有更加深刻认识,对技术资源和人的把握,也会更加清晰。
希望你以后在做技术选型时,能够把这些因素也考虑在内,这对你的技术成长之路也是非常有帮助的。