RocketMQ-延迟消息的处理流程介绍
概述
RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;
预设值的延迟时间间隔为:
1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;
broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。
Broker处理延迟消息
CommitLog.putMessage()
//获取消息的sysflagfinal int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); //非事务消息 或 已commit事务消息if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery 判断消息是否设置延迟if (msg.getDelayTimeLevel() > 0) {//判断延迟级别是否大于最大级别,如果大于最大值,则将延迟级别设置为最大级if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); }//延迟消息的topic为 SCHEDULE_TOPIC_XXXXtopic = ScheduleMessageService.SCHEDULE_TOPIC; //获取延迟级别,一个延迟级别对应一个QueuequeueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId//消息原始的topic,queueid保存到消息的property中MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); }}
1、判断消息类型,如果是非事务消息、已commit事务消息,才能处理延迟消息
2、判断消息是否设置延迟级别,如果延迟级别大于0,则该消息为延迟消息
3、判断延迟级别是否大于最大级别,如果大于最大值,则将延迟级别设置为最大级
4、延迟消息的topic为 SCHEDULE_TOPIC_XXXX
5、获取延迟级别,一个延迟级别对应一个Queue
6、消息原始的topic,queueid保存到消息的property中
7、修改消息的topci、queueid
启动延迟消息定时任务
ScheduleMessageService.start()
文章图片
延迟消息投递
文章图片
文章图片
文章图片
【RocketMQ-延迟消息的处理流程介绍】以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
推荐阅读
- 危险也是机会
- python学习之|python学习之 实现QQ自动发送消息
- 深入浅出谈一下有关分布式消息技术(Kafka)
- 夏夜|夏夜 我们
- Vue组件之事件总线和消息发布订阅详解
- Redis——发布订阅/消息队列
- Java消息中间件概念基础
- 使用交叉点观察器延迟加载图像以提高性能
- 【20190827复盘】——好消息
- MQ(消息队列)功能介绍