RocketMQ|RocketMQ -- 写在消息拉取前
消费者消费的时候,需要指定去消费哪个Topic的消息,比如TopicTest。
和生产者一样,消费者也需要去NameServer拉取TopicTest对应的元数据信息。
文章图片
如上图所示,TopicTest对应这两个broker,每个broker都有两个MessageQueue,那消费者从哪里拉取数据呢?
【RocketMQ|RocketMQ -- 写在消息拉取前】比如我们有两个消费者,消费者在启动的时候,会向每个broker发送心跳包,这样每一个broker都有消费者的信息,我们也可以从任意一个broker获取所有消费者的信息。
既然有了元数据信息,还有消费者的信息,那就可以通过分配算法,知道当前这个消费者去哪个MessageQueue拉取数据。
目前RocketMQ默认提供6种分配算法,包括平均分配(AllocateMessageQueueAveragely)、平均轮询分配(AllocateMessageQueueAveragelyByCircle)、一致性hash(AllocateMessageQueueConsistentHash)、根据配置(AllocateMessageQueueByConfig)、根据Broker部署机房名(AllocateMessageQueueByMachineRoom)、就近机房模式(AllocateMachineRoomNearby)。
默认是平均分配,比如有4个MessageQueue,那两个消费者各2个。消费者1分配的是MessageQueue0和MessageQueue1,消费者2分配MessageQueue2和MessageQueue3。
如果是平均轮询分配,那消费者1分配1个,消费者2分配1个,消费者1再分配一个,消费者2再分配一个,最终消费者1分配的是MessageQueue0和MessageQueue2,消费者2分配MessageQueue1和MessageQueue3。
我们接着平均分配进行往下说,以消费者1为例。
文章图片
此时已经知道消费者从MessageQueue0和MessageQueue1来消费,那从哪里开始消费呢?
消费的模式又分为从队列最新偏移量开始消费(CONSUME_FROM_LAST_OFFSET)、从头开始消费(CONSUME_FROM_FIRST_OFFSET)、从消费者启动的时间戳对应的消费进度开始消费(CONSUME_FROM_TIMESTAMP)。
根据以上的信息,我们就可以整合一个请求信息PullRequest,包括消费组、偏移量、MessageQueue、ProcessQueue(这个后面来解释),存放在pullRequestQueue链表里。
文章图片
推荐阅读
- RocketMQ 开源爱好者请注意邀您共探行业应用与生产实践
- 微服务|RabbitMQ之消息可靠性、死信交换机、惰性队列及集群
- NLP自然语言处理|2022/1/22记录网页
- RocketMQ|RocketMQ -- 消费者启动流程
- python发送消息到微信_通过python登录微信发送消息
- python开发企业微信应用,使用Python发送企业微信消息
- 车联网平台百万级消息吞吐架构设计
- 基于long|基于long pull实现简易的消息系统参考
- Kafka -- 消息发送存储流程
- RocketMQ之NameServer架构设计及启动关闭流程源码分析