RocketMQ|RocketMQ -- 消息拉取

上一篇提到,由于消息队列负载机制,会往pullRequestQueue队列放入一个个的PullRequest。
RocketMQ|RocketMQ -- 消息拉取
文章图片

这些PullRequest会有一个专门的线程,把它取出来并封装成服务端的一个Request,发送给broker。
在发送服务端之前,需要知道broker的地址在哪,需要从什么偏移量开始拉取(偏移量集群模式存broker,广播模式存本地),一次性拉多少数据。
RocketMQ|RocketMQ -- 消息拉取
文章图片

【RocketMQ|RocketMQ -- 消息拉取】broker收到请求后,就会从commitlog中,根据偏移量把所需要的消息给取出来,由于broker的主从同步,这边返回消息的时候,也会告知下一次拉取是从主broker拉取还是slave的broker拉取数据。
RocketMQ|RocketMQ -- 消息拉取
文章图片

从commit拉取消息的时候,会有这几个情况(下面忽略offsetCheckInSlave判断):

  1. 当前消息队列并没有消息,则下次拉取消息的时候,如果这个broker是主节点,还是这个偏移量。如果这个broker是从节点,则下次直接从0开始。
RocketMQ|RocketMQ -- 消息拉取
文章图片

  1. 当前消息队列有消息,但是我们拉取的偏移量比队列里最小的偏移量还小,比如我们需要拉取100的数据,但是消息队列的最小偏移量是500。则下次拉取消息的时候,如果这个broker是主节点,还是这个偏移量。如果这个broker是从节点,则下次直接从500开始。
RocketMQ|RocketMQ -- 消息拉取
文章图片

  1. 如果我们我们拉取的偏移量刚好等于消息队列里最大的偏移量,所以我们也没有数据可以消费了,那下次拉取的时候,还是100这个偏移量。
RocketMQ|RocketMQ -- 消息拉取
文章图片

  1. 如果我们我们拉取的偏移量大于消息队列里最大的偏移量,所以我们也没有数据可以消费了,那下次拉取的时候,这里还要判断消息队列里最小的偏移量是否等于0。
RocketMQ|RocketMQ -- 消息拉取
文章图片

  1. 正常情况下,我们需要的消息是在消息队列里最小的偏移量和最大的偏移量之间,那就直接把数据从commitlog中取出来并返回。
RocketMQ|RocketMQ -- 消息拉取
文章图片

消费端接收broker响应后,就会根据上面各个情况的偏移量进行更新,并且把消息存入PullRequest的processQueue中。
RocketMQ|RocketMQ -- 消息拉取
文章图片

并且再把PullRequest放入pullRequestQueue队列中,等待下次拉取。
processQueue的消息如果来不及消费,会一直的堆积,所以PullRequest在拉取消息的时候,会先判断processQueue里的消息数量是否已经超过1000,如果超过了,则当前不拉取,并放入pullRequestQueue队列中,50ms后才可以继续拉取。
此外还会判断processQueue中消息的大小、processQueue中队列最大偏移量与最小偏离量的间距,如果超过了阈值,也会放入pullRequestQueue队列中,50ms后才可以继续拉取。

    推荐阅读