RocketMQ|RocketMQ -- 消费者启动流程

消费者在发送消息的时候,有一个分组的概念。也就是说,有一个或者多个消费者的名称,是同一个,这些名称一样的消费者,组成了一个消费组。
比如下图中,两个group_a是一个消费组,两个group_b是一个消费组。
RocketMQ|RocketMQ -- 消费者启动流程
文章图片

当broker有消息a进来的时候,那这个消息都会同时被这两个消费组所消费。
RocketMQ|RocketMQ -- 消费者启动流程
文章图片

我们也看到了每个消费组是有多个消费者的,如果消息被group_a消费的时候,两个消费者是怎么消费的呢?
这里就涉及到了集群模式与广播模式两种消费模式。
在集群模式下,这个消息只能被消费组内一个消费者进行消费。比如group_a是集群模式,那此时只能一个消费者进行消费。
RocketMQ|RocketMQ -- 消费者启动流程
文章图片

在广播模式下,这个消息会被消费组下的每一个消费者进行消费比如group_b是广播模式,那此时两个消费者都进行消费。
RocketMQ|RocketMQ -- 消费者启动流程
文章图片

消费组和消费模式这么重要,所以消费者启动的时候,就需要去校验消费组和消费模式。
RocketMQ|RocketMQ -- 消费者启动流程
文章图片

构建Topic订阅信息,比如这个消费者需要订阅TopicTest的消息,就是在这里构建的。这个步骤中,Topic订阅信息是一个map,所以会存在多个的。
RocketMQ|RocketMQ -- 消费者启动流程
文章图片

基础信息准备好了,就开始初始化MQClientInstance、RebalanceImple(消息重新负载实现类)。
RocketMQ|RocketMQ -- 消费者启动流程
文章图片

初始化消息进度,这里根据消费模式会有两种情况:
一个是集群模式,由于消费组的几个消费者只能一个进行消费,如果这个消费者挂了,那另外一个消费者需要从之前的位置开始消费,所以这个进度不能保存在本地,需要保存在broker中,最终从broker拉取消费进度。
另外一个是集群模式,消费者的消息自己维护自己的进度,所以这个进度保存在自己的本地,最终从本地拉取消费进度。
RocketMQ|RocketMQ -- 消费者启动流程
文章图片

如果消息是顺序消费的,那创建一个顺序消息消费服务,如果不是,则创建并发消息消费服务,这两个后面来讲细节。
创建好后启动相应的消息消费服务。
RocketMQ|RocketMQ -- 消费者启动流程
文章图片

启动消费者客户端,这里的启动跟生产者是一样的,包括消息重新负载的启动等。
【RocketMQ|RocketMQ -- 消费者启动流程】RocketMQ|RocketMQ -- 消费者启动流程
文章图片

    推荐阅读