RocketMQ|RocketMQ -- 消息消费队列与索引文件
【RocketMQ|RocketMQ -- 消息消费队列与索引文件】在store目录下,除了commilog目录,还有consumequeue和index目录。
文章图片
consumequeue是消息消费队列存储目录,比如我们建了一个TopicTest,有四个MessageQueue,那在consumequeue目录下,就有一个TopicTest目录,TopicTest目录下还有0,1,2,3四个目录,对应着MessageQueue的个数。这些数字的下面的文件,就是实际上的数据。
前面已经讲了消息都是落在了commitlog日志文件中,我们消费的时候,却是根据topic来的,如果需要一个个的从commitlog日志文件中遍历某一个topic,那这个效率就非常低下了,所以就有了一个ConsumeQueue来记录每一个topic在commitlog的位置。
另外一个index是消息索引文件存储目录,通过Hash索引机制为消息建立索引,RocketMQ会将消息索引键与消息偏移量映射关系写入到IndexFile。
既然ConsumeQueue和IndexFile都是关联着commilog日志文件,那我们写入commitlog日志文件的时候,ConsumeQueue和IndexFile是什么时候写入的呢?
文章图片
broker启动的时候,还会启动一个线程,叫做ReputMessageService,负责把commitlog日志的更新事件传播出去,他记录commitlog从哪个偏移量开始转发消息给ConsumeQueue和IndexFile。
文章图片
每隔1ms,这个线程就会通过偏移量去commitlog日志文件里看看是否有新的消息进来,如果有,就会把消息进行转发,由于需要写入ConsumeQueue和IndexFile,所以这里就有两个转发器。
文章图片
CommitLogDispatcherBuildConsumeQueue会根据topic以及队列的ID,获取对应的ConsumeQueue文件(可以参考上面的目录树),把数据写入其中。
CommitLogDispatcherBuildIndex则会构建索引键,再写入数据。
文章图片
推荐阅读
- Kafka 怎么顺序消费(面试必备。。。)
- RocketMQ|RocketMQ -- 消息发送存储流程
- 开源IM项目OpenIM 客户端SDK架构剖析-确保消息的有序性,以及消息百分百可达
- Pulsar 也会重复消费?
- SpringCloud|Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ)
- python编程实现撤销上一步操作_78行Python代码实现现微信撤回消息功能
- xiaoxi整理
- 消息复杂计算的抽象和简化
- vivo鲁班RocketMQ平台的消息灰度方案
- 笔记|Day30.守护线程、定时器、消费者模式 | wait、notify方法