本文侧重讲解 RocketMQ 的实际应用,关于理论部分,在另外一篇文章中再做探讨。在此不多说,直接进入实战吧。
1. 配置
通常开发直接依赖 rocketmq-spring-boot-starter
即可,starter 中包含了所有所需的依赖,如:
rocketmq-client
:封装了客户端的应用程序,还包含了netty的通讯服务。rocketmq-acl
:访问权限控制服务。
starter
还提供了很多现成封装类,如:RocketMQTemplate.java
、RocketMQListener.java
、RocketMQUtil.java
等,在应用开发时会经常用到。建议直接用上述的
rocketmq-spring-boot-starter
,见过有公司为了内部兼容,自己封装了一个服务替代官方的 starter
。但这个服务除了增加部分自定义程序外,其他的类和方法都是照拷贝 starter
的。当后续
rocketmq-spring-boot-starter
升级了,或修复bug、或拓展功能,公司内部的服务就很难升级了,除非再从头拷贝一遍。当公司内部没有相应的体量,建议不要学大厂自己封装基础服务,否则容易骑虎难下。pom依赖
org.apache.rocketmq
rocketmq-spring-boot-starter
2.2.0
写文章时,starter 最新的版本是
2.2.0
,对应的 rocketmq-client
、rocketmq-acl
版本是 4.8.0
。认准版本好很重要,因为
rocketmq-spring-boot-starter
一直在快速迭代中,很多类和方法,在新版本中都会改变,例如下文会提到的tag、消息事务等,这是也是为什么不建议公司内部封装 starter。application
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv
# Producer 配置项
producer:
group: koala-dev-event-centre-group # 生产者分组
send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
secret-key: # Secret Key
enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
# Consumer 配置项
consumer:
listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, > 。默认情况下,不配置表示监听。
erbadagang-consumer-group:
topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费
rocketmq 配置很多,除了基础有关server的配置以外,还有acl、producer、consumer等。但通常一个服务内会有多个consumer,建议在代码中实现。而producer如果只有一个,可以配置。
2.普通消息发送 有关 rocketmq 发送消息的源码,建议查看
org.apache.rocketmq.client.producer.DefaultMQProducer
,类中的属性大多对应于配置文件中的参数。2.1. 三种消息发送
这里只讨论普通的消息发送方式,区别于顺序、事务、延迟/定时等消息发送方式,当前分为三种:
- 同步(sync): 同步发送就是指 producer 发送消息后,会同步等待,在接收到 broker 响应结果后才继续发下一条消息。
- 异步(async): 异步发送是指 producer 发出一条消息后,不需要等待 broker 响应,就接着发送下一条消息的通信方式。异步发送同样可以对消息的响应结果进行处理,需要在发送消息时实现异步发送回调接口。
- 单方向(oneWay): 是一种单方向通信方式,也就是说 producer 只负责发送消息,不等待 broker 发回响应结果,而且也没有回调函数触发,这也就意味着 producer 只发送请求不等待响应结果。
三种发送方式对比
发送方式 | 发送TPS | 发送结果响应 | 可靠性 | 使用场景 |
---|---|---|---|---|
同步 | 一般 | 有 | 高 | 重要的通知场景 |
异步 | 快 | 有 | 高 | 比较注重 RT(响应时间)的场景 |
单方向 | 最快 | 无 | 低 | 可靠性要求并不高的场景 |
async 异步执行的线程池配置
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 60000L, TimeUnit.MILLISECONDS, this.asyncSenderThreadPoolQueue, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
三种发送消息代码
private String convertDestination(String topic, String tag) {
return StringUtils.isBlank(tag) ? topic : StringUtils.join(topic, ":", tag);
}/**
* 同步发送
*/
public SendResult syncSend(String topic, String tag, String content) {
String destination = this.convertDestination(topic, tag);
return rocketMQTemplate.syncSend(destination, content);
}/**
* 异步发送
*/
public void asyncSend(String topic, String tag, String content, SendCallback sendCallback) {
String destination = this.convertDestination(topic, tag);
rocketMQTemplate.asyncSend(destination, content, sendCallback);
}/**
* 单向发送
*/
public void sendOneWay(String topic, String tag, String content) {
String destination = this.convertDestination(topic, tag);
rocketMQTemplate.sendOneWay(destination, content);
}
2.2. 批量发送
批量消息发送是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。
当然,并不是在同一批次中发送的消息数量越多,性能就越好,判断依据是单条消息的长度,如果单条消息内容比较长,则打包发送多条消息会影响其他线程发送消息的响应时间,并且单批次消息发送总长度不能超过DefaultMQProducer#maxMessageSize,即配置文件中的
rocketmq.producer.max-message-size
。代码
/**
* 同步-批量发送
*/
public SendResult syncBatchSend(String topic, String tag, List contentList) {
String destination = this.convertDestination(topic, tag);
List messageList = contentList.stream()
.map(content -> MessageBuilder.withPayload(content).build())
.collect(Collectors.toList());
return rocketMQTemplate.syncSend(destination, messageList);
}
4. 标签tag rocketmq中,topic与tag都是业务上用来归类的标识,区分在于topic是一级分类,而tag可以理解为是二级分类。定义上:
- topic: 消息主题,通过Topic对不同的业务消息进行分类。
- tag: 消息标签,用来进一步区分某个Topic下的消息分类,消息从生产者发出即带上的属性。
- 消息类型是否一致: 如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的Topic,无法通过Tag进行区分。
- 业务是否相关联: 没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的Topic进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用Tag进行区分。
- 消息优先级是否一致: 如同样是物流消息,盒马必须小时内送达,天猫超市24小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的Topic进行区分。
- 消息量级是否相当: 有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的Topic。
如果没有tag的机制,消费端就得接收所有消息,反序列化后只处理自己对应“资源类型”的消息。有了tag机制,消息在进入消费端途中就自动进行过滤分发。
与 RabbitMQ AMQP协议 比较在使用tag机制后,第一时间让我想到了 RabbitMQ 的 AMQP协议,很像交换机和队列的机制。当使用tag之后,像扇形交换机;当没使用tag之后,就像直连交换机。
- Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
- Binding:绑定,它的作用就是把 Exchange 和 Queue 按照路由规则绑定起来。
- Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
示例代码:生产者通过前面示例代码中,最简单的发送同步消息代码来看,最新
starter
中封装的 RocketMQTemplate
类中,消息发送的目标是 String destination
。而 它包含了 topic
和 tag
。即私有转换方法中的: destination = topic:tag
。private final RocketMQTemplate rocketMQTemplate;
private String convertDestination(String topic, String tag) {
return StringUtils.isBlank(tag) ? topic : StringUtils.join(topic, ":", tag);
}/**
* 同步发送
*/
public SendResult syncSend(String topic, String tag, String content) {
String destination = this.convertDestination(topic, tag);
return rocketMQTemplate.syncSend(destination, content);
}
示例代码:消费者
@RocketMQMessageListener(consumerGroup = ShareRocketMqConstants.GROUP_PREFIX + ShareRocketMqConstants.TOPIC_SHARE_RSRC_TO_BIZ_CALLBACK,
topic = ShareRocketMqConstants.TOPIC_SHARE_RSRC_TO_BIZ_CALLBACK,
selectorExpression = "2||3||4",
consumeThreadMax = 3)
public class ShareRsrcMqConsumer implements RocketMQListener {
... ...
}
上述消费端代码中,申明只消费tag值为:2、3、4 的消息,注解中核心有两个属性:
selectorType:
默认值就是 SelectorType.TAG,所以示例代码中没有设置。selectorExpression:
对应的表达式。针对 SelectorType.TAG类型的,就需要设置 tag的表达式。默认值是*
,即所有tag都消费。如果想要指定消费多个tag,则用||
或符合来连接。
5. 延迟/定时消息 定时消息是指消息发到broker后,不能立刻被consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。
原理其实定时消息实现原理比较简单,如果一个topic对应的消息在发送端被设置为定时消息,那么会将该消息先存放在topic为
SCHEDULE_TOPIC_XXXX
的消息队列中,并将原始消息的信息存放在commitLog文件中,由于topic为SCHEDULE_TOPIC_XXXX,所以该消息不会被立即消息,然后通过定时扫描的方式,将到达延迟时间的消息,转换为正确的消息,发送到相应的队列进行消费。延迟级别尽管 rocketmq 支持定时消息,但是当前开源版本的 rocketmq 所支持的定时时间是有限的、不同级别的精度的时间,并不是任意无限制的定时时间。默认 Broker服务器端有18个定时级别,每一个级别分别对应不同的延迟时间:
延迟级别 | 延迟时间 |
---|---|
1 | 1s |
2 | 5s |
3 | 10s |
4 | 30s |
5 | 1m |
6 | 2m |
7 | 3m |
8 | 4m |
9 | 5m |
10 | 6m |
11 | 7m |
12 | 8m |
13 | 9m |
14 | 10m |
15 | 20m |
16 | 30m |
17 | 1h |
18 | 2h |
代码发送延迟消息并没有特殊的方法,而是基于普通发消息的方法(如:rocketMQTemplate.syncSend)做了重载,增加了一个传入参数
int delayLevel
,默认值为 0,即立即发送。/**
* 同步延迟发送
*
* @param delayLevel 延时等级:现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级 1 到 18
*1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
public SendResult syncSendDelay(String topic, String tag, String content, long timeout, int delayLevel) {
String destination = this.convertDestination(topic, tag);
Message message = MessageBuilder.withPayload(content).build();
return rocketMQTemplate.syncSend(destination, message, timeout, delayLevel);
}
6. 顺序消息 顺序消息是一种对消息发送和消费顺序有严格要求的消息,对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
RocketMQ目前只能保证同一个分区队列内的顺序消息,因此实现下列场景的方式有:
- 分区有序: RocketMQ 支持同一个队列分区内的顺序消息。另外某个 Topic 下,所有消息根据 ShardingKey 进行分区,相同 ShardingKey 的消息必须被发送到同一个分区队列。因此只要保证消息按照同一 ShardingKey 发送即可,然后保证 Consumer 同一个队列单线程消费即可。
- 全局有序: 当设置 Topic 下只有一个分区时,可以实现全局有序。
最常见将订单号转换为 ShardingKey 的方式就是 hashKey。
生产者
/**
* 同步顺序发送
*
* @param hashKey 根据 hashKey 和 队列size() 取模,保证同一 hashKey 的消息发往同一个队列,以实现 同一hashKey下的消息 顺序发送
*因此 hashKey 建议取 业务上唯一标识符,如:订单号,只需保证同一订单号下的消息顺序发送
*/
public SendResult syncSendOrderly(String topic, String tag, String content, String hashKey) {
String destination = this.convertDestination(topic, tag);
Message message = MessageBuilder.withPayload(content).build();
return rocketMQTemplate.syncSendOrderly(destination, message, hashKey);
}
消费者针对顺序消息的消费,代码也很容易,主要是
@RocketMQMessageListener
注解,通过设置了consumeMode = ConsumeMode.ORDERLY
,表示使用顺序消费。ConsumeMode 有两种值:
- CONCURRENTLY:默认值,并发同时接收异步传递的消息。
- ORDERLY:顺序消费时开启,只开启一个线程,同一时间只有序接收一个队列的消息。
@RocketMQMessageListener(topic = "xxx-topic",
consumerGroup = "xxxGroup",
consumeMode = ConsumeMode.ORDERLY)
public class OrderConsumer implements RocketMQListener {
@Override
public void onMessage(String message) {
... ...
}
}
提问:如果针对顺序消息的消费者,同时启动了多个spring实例,会影响吗?这个问题当时想了好久,为了保证消息按照顺序消费,消费者是单线程消费的。可实际线上程序都不会是单节点,如果有多个spring实例,不是也可以理解成“多线程”处理了吗?
首先,回顾几个知识点吧:
- 顺序消费只能针对 “集群模式” 消费,即
messageModel = MessageModel.CLUSTERING
。 - 集群模式下,多个消费者如何对消息队列进行负载呢?消息队列负载机制遵循一个通用的思想:一个消息队列同一时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。
- 虽然消费者代码中, RocketMQ 监听器像是mq中的“推模式”。但实际上,RocketMQ消息推模式基于拉模式实现,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。
7. 事务消息 这里着重说明一下,RocketMQ 的事务机制,和我们通常说的通过MQ来实现最终一致性的分布式事务机制,不是一个事情。
RocketMQ 的事务机制,只体现在生产者,保障的是生产者本地的事务执行、发消息,这两个事务达成一致性。至于消费者收到消息后的事务处理,并不在当前机制内。
正常事务的流程正常事务的流程,遵循的是
2PC
的方案。- 调用发送事务消息方法,正常发送消息。发送事务的方法名为
syncSendInTransaction
。 - mq服务器端成功接收到消息后,消息处于一个半接收的状态,并响应给生产者客户端。
- 生产者收到服务器端成功接收的响应后,执行本地事务。本地事务写在
executeLocalTransaction
方法里面,返回结果为枚举RocketMQLocalTransactionState
,有:COMMIT、ROLLBACK、UNKNOWN
三种值。 - 服务器端收到
COMMIT
状态后,会把消息下发给消费者 - 服务器端收到
ROLLBACK
状态后,会删除掉当前半接收状态的消息,不再处理。 - 服务器端收到
UNKNOWN
状态,或者服务器端超时未收到消息,或者生产者未响应状态,则将进行消息补偿机制。
消息补偿机制这部分比较简单,针对上述事务流程第6点的几种情况,会触发消息回查。
- 当事务消息出现
UNKNOWN
、超时、未响应时,服务器会主动调用生产者本地的回查方法checkLocalTransaction
,查询本地事务执行情况,返回结果还是枚举值RocketMQLocalTransactionState
。 - 服务器接收到返回结果的处理流程和前面的正常流程一样。
- 如果依然是
UNKNOWN
、超时、未响应,将继续重试。如果超过最大重试次数后,依然无果,则视为ROLLBACK
,删除当前消息。
org.apache.rocketmq.common.BrokerConfig
类中定义,例如以下几个常用属性的默认值:transactionTimeOut = 6000L
:服务器未收到事务本地消息的超时时间为1分钟。transactionCheckMax = 15
:消息补偿机制中的最大回查次数为15次。transactionCheckInterval = 6000L
:消息补偿机制中每次回查的时间间隔为1分钟。
BrokerConfig
类中的属性,因此如果不想用默认值,可以在 broker.conf
文件中自定义修改。代码:生产者发送事务消息
/**
* 事务发送
*/
public TransactionSendResult syncSendInTransaction(String topic, String tag, String content) {
String destination = this.convertDestination(topic, tag);
String transactionId = UUID.randomUUID().toString();
Message message = MessageBuilder.withPayload(content)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.build();
return rocketMQTemplate.sendMessageInTransaction(destination, message, content);
}
代码:生产者定义事务本地方法在生产者客户端,通过事务监听器,实现
RocketMQLocalTransactionListener
接口的两个上述方法。@RocketMQTransactionListener
public class LocalTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("executeLocalTransaction: "+ LocalDateTime.now());
return RocketMQLocalTransactionState.UNKNOWN;
}@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("checkLocalTransaction: "+ LocalDateTime.now());
return RocketMQLocalTransactionState.COMMIT;
}
}
提问:如果spring项目中有多个事务消息生产者,怎么区分不同的RocketMQLocalTransactionListener
?
@RocketMQLocalTransactionListener
这个注解提供了属性,可以区分不同的事务消息生产者。在 stater 2.0.4 版本中,是提供 txProducerGroup 这个属性指向一个消息发送者组,映射不同的事务消息发送逻辑。但好像有bug,在后续新的版本迭代中,去掉了这个属性。到了 2.1.1 版本,只能通过指定
rocketMQTemplateBeanName
来实现,即不同的事务消息发送时,就得定义不同的 RocketMQTemplate。挺麻烦的,期待这个功能在后续的迭代中完善好。8. 重试队列、死信队列 RocketMQ 很多应多异常的保全机制,例如消息重发的机制,这里可以分两类:
- 生产者重发: 在前面介绍三种发送消息方式时,针对同步、异步发送失败时,都会再重发,相应重发次数分别对应
DefaultMQProducer
类中属性值retryTimesWhenSendFailed
、retryTimesWhenSendAsyncFailed
,也可以在 properties 配置文件中自定义设置。 - 消费者重发: 当消息已经进入 broker 后,消费者接收失败,broker 也会给消费者重发,以下衍生出本次的重试队列、死信队列。
重试队列如果消费者端因为各种类型异常导致本次消费失败,为防止该消息丢失而需要将其重新回发给broker端保存,保存这种因为异常无法正常消费而回发给mq的消息队列称之为重试队列。
RocketMQ 会为每个消费组都设置一个 topic 名称为
“%RETRY%+consumerGroup”
的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的)。用于暂时保存因为各种异常而导致消费者端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至 topic 名称为
“SCHEDULE_TOPIC_XXXX”
的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。死信队列由于有些原因导致消费者端长时间的无法正常消费从 broker 端 pull过来的业务消息,为了确保消息不会被无故的丢弃,那么超过配置的“最大重试消费次数”后就会移入到这个死信队列中。
在RocketMQ中,SubscriptionGroupConfig 配置常量默认地设置了两个参数,一个是retryQueueNums为1(重试队列数量为1个),另外一个是retryMaxTimes为16(最大重试消费的次数为16次)。Broker端通过校验判断,如果超过了最大重试消费次数则会将消息移至这里所说的死信队列。这里,RocketMQ会为每个消费组都设置一个 topic 命名为
“%DLQ%+consumerGroup"
的死信队列。但如果一个消费者组未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列的。【RocketMQ - 应用篇】因为死信队列中的消息是无法被消费的,它也证实了一部分消息出现了意料之外的情况。因此一般在实际应用中,移入至死信队列的消息,需要人工干预处理。例如通过 console 查看是否有私信队列,当解决问题后,可在 console 上手动重发消息。
推荐阅读
- RocketMQ - 理论篇
- RocketMQ-单机版安装及远程连接测试
- RocketMQ安装及测试
- RocketMQ原理
- Kafka|Kafka VS RocketMQ VS RabbitMQ
- 【运维】【一】RocketMQ 运维命令详解
- RocketMQ
- RocketMQ遇到的坑
- Rocketmq创建topic报错org.apache.rocketmq.acl.common.AclException: [10015:signature-failed] unable to cal