于今腐草无萤火,终古垂杨有暮鸦。这篇文章主要讲述分布式技术专题分布式消息队列-RocketMQ延迟消息实现原理和源码分析相关的知识,希望能为你提供帮助。
痛点背景
业务场景之前方案方案评估
- 优点:是实现简单,缺点呢?
- 缺点:定时扫描意味着隔个几秒就得查一次数据库,频率高的情况下,如果数据库中订单总量特别大,这种高频扫描会对数据库带来一定压力,待付款订单特别多时(做个爆品秒杀活动,或者啥促销活动),若一次性查到内存中,容易引起宕机,需要分页查询,多少也会有一定数据库层面压力。
- 能够在指定时间间隔后触发某个业务操作
- 能够应对业务数据量特别大的特殊场景
- RocketMQ支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;
- 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
- 在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;
- broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。
延时队列生产者端:延时消息的关键点在于Producer生产者需要给消息设置特定延时级别,消费端代码与正常消费者没有差别。
public class Producer {
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
//设置namesrv地址
producer.setNamesrvAddr("111.231.110.149:9876");
//启动生产者
producer.start();
//发送10条消息
for (int i = 0;
i <
10;
i++) {
try {
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("test message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//设置消息延时级别3对应10秒后发送
//延时级别1对应延时1秒后发送消息
//延时级别2对应延时5秒后发送消息
//延时级别3对应延时10秒后发送消息
//以此类推。
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
/*
* Shut down once the producer instance is not longer in use.
*/
producer.shutdown();
}
}
初始化load方法
public boolean load() {
boolean result = super.load();
result = result &
&
this.parseDelayLevel();
return result;
}
ScheduleMessageService继承自ConfigManager类,super.load()方法对应
public boolean load() {
String fileName = null;
try {
fileName = this.configFilePath();
String jsonString = MixAll.file2String(fileName);
if (null == jsonString || jsonString.length() == 0) {
return this.loadBak();
} else {
this.decode(jsonString);
log.info("load " + fileName + " OK");
return true;
}
} catch (Exception e) {
log.error("load " + fileName + " failed, and try to load backup file", e);
return this.loadBak();
}
}
延时队列源码分析:先从延时消息延迟级别设置与broker端消息持久化入手。
具体实现启动延迟消息定时任务内部变量含义延时消息定时投递相关具体实现代码在ScheduleMessageService中,先看下变量定义
文章图片
- delayLevelTable定义了延迟级别和延迟时间的对应关系
- offsetTable存放延延迟级别对应的队列消费的offset
ScheduleMessageService.start()
文章图片
延迟消息投递
文章图片
其中根据,delayLevel获取消费队列id的方法如下,即queueId = delayLevel-1
public static int delayLevel2QueueId(final int delayLevel) {
return delayLevel - 1;
}
文章图片
核心逻辑就是取出tagCode(延时消息持久化时,tagsCode存储的是消息投递时间),解析成消息投递时间,与当前时间戳做差,判断是否应该进行消息投递,具体进行消息投递的方法,在if (countdown < = 0)中,看下代码
文章图片
重新投递实现总结
- 优点:设计简单,把所有相同延迟时间的消息都先放到一个队列中,定时扫描,可以保证消息消费的有序性
- 【分布式技术专题分布式消息队列-RocketMQ延迟消息实现原理和源码分析】缺点:定时器采用了timer,timer是单线程运行,如果延迟消息数量很大的情况下,可能单线程处理不过来,造成消息到期后也没有发送出去的情况
- 改进点:可以在每个延迟队列上各采用一个timer,或者使用timer进行扫描,加一个线程池对消息进行处理,这样可以提供效率
- producer端设置消息delayLevel延迟级别,消息属性DELAY中存储了对应了延时级别
- broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始topic,queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC),queueId改为延时级别-1
- mq服务端ScheduleMessageService中,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列
- 根据消费偏移量offset从commitLog中解析出对应消息
- 从消息tagsCode中解析出消息应当被投递的时间,与当前时间做比较,判断是否应该进行投递
- 若到达了投递时间,则构建一个新的消息,并从消息属性中恢复出原始的topic,queueId,并清除消息延迟属性,从新进行消息投递
推荐阅读
- 千亿级模型在离线一致性保障方案详解
- WEB安全新玩法 [4] 防护邮箱密码重置漏洞
- 你想学的黑客(攻击)技术全在这了,一篇打包带走!
- SpringCloud技术专题「Feign」从源码层面让你认识Feign工作流程和运作机制
- 靶机DC-1
- WEB安全新玩法 [11] 防范批量注册
- 测试开发之系统篇-Docker常用操作
- 如何在Django中从HTML创建PDF
- 使用jQuery Plugin Multiselect使用复选框实现可过滤的多选