RocketMQ|RocketMQ -- 消费者启动流程
消费者在发送消息的时候,有一个分组的概念。也就是说,有一个或者多个消费者的名称,是同一个,这些名称一样的消费者,组成了一个消费组。
比如下图中,两个group_a是一个消费组,两个group_b是一个消费组。
文章图片
当broker有消息a进来的时候,那这个消息都会同时被这两个消费组所消费。
文章图片
我们也看到了每个消费组是有多个消费者的,如果消息被group_a消费的时候,两个消费者是怎么消费的呢?
这里就涉及到了集群模式与广播模式两种消费模式。
在集群模式下,这个消息只能被消费组内一个消费者进行消费。比如group_a是集群模式,那此时只能一个消费者进行消费。
文章图片
在广播模式下,这个消息会被消费组下的每一个消费者进行消费比如group_b是广播模式,那此时两个消费者都进行消费。
文章图片
消费组和消费模式这么重要,所以消费者启动的时候,就需要去校验消费组和消费模式。
文章图片
构建Topic订阅信息,比如这个消费者需要订阅TopicTest的消息,就是在这里构建的。这个步骤中,Topic订阅信息是一个map,所以会存在多个的。
文章图片
基础信息准备好了,就开始初始化MQClientInstance、RebalanceImple(消息重新负载实现类)。
文章图片
初始化消息进度,这里根据消费模式会有两种情况:
一个是集群模式,由于消费组的几个消费者只能一个进行消费,如果这个消费者挂了,那另外一个消费者需要从之前的位置开始消费,所以这个进度不能保存在本地,需要保存在broker中,最终从broker拉取消费进度。
另外一个是集群模式,消费者的消息自己维护自己的进度,所以这个进度保存在自己的本地,最终从本地拉取消费进度。
文章图片
如果消息是顺序消费的,那创建一个顺序消息消费服务,如果不是,则创建并发消息消费服务,这两个后面来讲细节。
创建好后启动相应的消息消费服务。
文章图片
启动消费者客户端,这里的启动跟生产者是一样的,包括消息重新负载的启动等。
【RocketMQ|RocketMQ -- 消费者启动流程】
文章图片
推荐阅读
- springboot-单元测试Test没有绿色三角标识启动箭头
- Python 自动清理电脑垃圾文件,一键启动即可
- 《Win10——如何进入高级启动选项(》)
- 抖音|抖音 Android 性能优化系列(启动优化之理论和工具篇)
- 环境配置|启动jupyter notebook链接不上内核 + 终端报错(Replacing stale connection)
- 解决Hadoop集群hdfs无法启动DataNode的问题
- 带你详细了解mongodb数据库
- Jetpack|Kotlin 之 协程(二)启动取消协程
- Android|后台启动 Activity
- Android高级进阶|Android 10.0以后高版本后台启动Activity的方法