RocketMQ|RocketMQ -- 消息拉取
上一篇提到,由于消息队列负载机制,会往pullRequestQueue队列放入一个个的PullRequest。
文章图片
这些PullRequest会有一个专门的线程,把它取出来并封装成服务端的一个Request,发送给broker。
在发送服务端之前,需要知道broker的地址在哪,需要从什么偏移量开始拉取(偏移量集群模式存broker,广播模式存本地),一次性拉多少数据。
文章图片
【RocketMQ|RocketMQ -- 消息拉取】broker收到请求后,就会从commitlog中,根据偏移量把所需要的消息给取出来,由于broker的主从同步,这边返回消息的时候,也会告知下一次拉取是从主broker拉取还是slave的broker拉取数据。
文章图片
从commit拉取消息的时候,会有这几个情况(下面忽略offsetCheckInSlave判断):
- 当前消息队列并没有消息,则下次拉取消息的时候,如果这个broker是主节点,还是这个偏移量。如果这个broker是从节点,则下次直接从0开始。
文章图片
- 当前消息队列有消息,但是我们拉取的偏移量比队列里最小的偏移量还小,比如我们需要拉取100的数据,但是消息队列的最小偏移量是500。则下次拉取消息的时候,如果这个broker是主节点,还是这个偏移量。如果这个broker是从节点,则下次直接从500开始。
文章图片
- 如果我们我们拉取的偏移量刚好等于消息队列里最大的偏移量,所以我们也没有数据可以消费了,那下次拉取的时候,还是100这个偏移量。
文章图片
- 如果我们我们拉取的偏移量大于消息队列里最大的偏移量,所以我们也没有数据可以消费了,那下次拉取的时候,这里还要判断消息队列里最小的偏移量是否等于0。
文章图片
- 正常情况下,我们需要的消息是在消息队列里最小的偏移量和最大的偏移量之间,那就直接把数据从commitlog中取出来并返回。
文章图片
消费端接收broker响应后,就会根据上面各个情况的偏移量进行更新,并且把消息存入PullRequest的processQueue中。
文章图片
并且再把PullRequest放入pullRequestQueue队列中,等待下次拉取。
processQueue的消息如果来不及消费,会一直的堆积,所以PullRequest在拉取消息的时候,会先判断processQueue里的消息数量是否已经超过1000,如果超过了,则当前不拉取,并放入pullRequestQueue队列中,50ms后才可以继续拉取。
此外还会判断processQueue中消息的大小、processQueue中队列最大偏移量与最小偏离量的间距,如果超过了阈值,也会放入pullRequestQueue队列中,50ms后才可以继续拉取。
推荐阅读
- VC++|VC++简单绘图 MFC消息映射机制 MessageBox函数
- qq来消息不显示在手机屏幕上怎么办
- JSTL格式标签
- 注册表个性化调整Win10系统通知消息显示在右下角
- 微信订阅号没有关注为什么还会发送推送消息
- 怎么取消微信里的订阅号消息
- 微信群发不出去消息红色感叹号
- 微信朋友圈消息记录怎么查看
- 宜搭小技巧|第一时间看到审批进度(消息通知来帮你)
- 微信接收消息没有声音怎么回事