RocketMQ - 应用篇

本文侧重讲解 RocketMQ 的实际应用,关于理论部分,在另外一篇文章中再做探讨。在此不多说,直接进入实战吧。
1. 配置 通常开发直接依赖 rocketmq-spring-boot-starter 即可,starter 中包含了所有所需的依赖,如:

  • rocketmq-client:封装了客户端的应用程序,还包含了netty的通讯服务。
  • rocketmq-acl:访问权限控制服务。
starter 还提供了很多现成封装类,如:RocketMQTemplate.javaRocketMQListener.javaRocketMQUtil.java 等,在应用开发时会经常用到。
建议直接用上述的 rocketmq-spring-boot-starter,见过有公司为了内部兼容,自己封装了一个服务替代官方的 starter。但这个服务除了增加部分自定义程序外,其他的类和方法都是照拷贝 starter 的。
当后续 rocketmq-spring-boot-starter 升级了,或修复bug、或拓展功能,公司内部的服务就很难升级了,除非再从头拷贝一遍。当公司内部没有相应的体量,建议不要学大厂自己封装基础服务,否则容易骑虎难下。
pom依赖
org.apache.rocketmq rocketmq-spring-boot-starter 2.2.0

写文章时,starter 最新的版本是 2.2.0,对应的 rocketmq-clientrocketmq-acl版本是 4.8.0
认准版本好很重要,因为 rocketmq-spring-boot-starter 一直在快速迭代中,很多类和方法,在新版本中都会改变,例如下文会提到的tag、消息事务等,这是也是为什么不建议公司内部封装 starter。
application
# rocketmq 配置项,对应 RocketMQProperties 配置类 rocketmq: name-server: 127.0.0.1:9876 # RocketMQ Namesrv # Producer 配置项 producer: group: koala-dev-event-centre-group # 生产者分组 send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。 compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。 retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。 retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档 secret-key: # Secret Key enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档 customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。 # Consumer 配置项 consumer: listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, > 。默认情况下,不配置表示监听。 erbadagang-consumer-group: topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费

rocketmq 配置很多,除了基础有关server的配置以外,还有acl、producer、consumer等。但通常一个服务内会有多个consumer,建议在代码中实现。而producer如果只有一个,可以配置。
2.普通消息发送 有关 rocketmq 发送消息的源码,建议查看 org.apache.rocketmq.client.producer.DefaultMQProducer ,类中的属性大多对应于配置文件中的参数。
2.1. 三种消息发送
这里只讨论普通的消息发送方式,区别于顺序、事务、延迟/定时等消息发送方式,当前分为三种:
  • 同步(sync): 同步发送就是指 producer 发送消息后,会同步等待,在接收到 broker 响应结果后才继续发下一条消息。
  • 异步(async): 异步发送是指 producer 发出一条消息后,不需要等待 broker 响应,就接着发送下一条消息的通信方式。异步发送同样可以对消息的响应结果进行处理,需要在发送消息时实现异步发送回调接口。
  • 单方向(oneWay): 是一种单方向通信方式,也就是说 producer 只负责发送消息,不等待 broker 发回响应结果,而且也没有回调函数触发,这也就意味着 producer 只发送请求不等待响应结果。
三种发送方式对比
发送方式 发送TPS 发送结果响应 可靠性 使用场景
同步 一般 重要的通知场景
异步 比较注重 RT(响应时间)的场景
单方向 最快 可靠性要求并不高的场景
async 异步执行的线程池配置
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 60000L, TimeUnit.MILLISECONDS, this.asyncSenderThreadPoolQueue, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); public Thread newThread(Runnable r) { return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); } });

三种发送消息代码
private String convertDestination(String topic, String tag) { return StringUtils.isBlank(tag) ? topic : StringUtils.join(topic, ":", tag); }/** * 同步发送 */ public SendResult syncSend(String topic, String tag, String content) { String destination = this.convertDestination(topic, tag); return rocketMQTemplate.syncSend(destination, content); }/** * 异步发送 */ public void asyncSend(String topic, String tag, String content, SendCallback sendCallback) { String destination = this.convertDestination(topic, tag); rocketMQTemplate.asyncSend(destination, content, sendCallback); }/** * 单向发送 */ public void sendOneWay(String topic, String tag, String content) { String destination = this.convertDestination(topic, tag); rocketMQTemplate.sendOneWay(destination, content); }

2.2. 批量发送
批量消息发送是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。
当然,并不是在同一批次中发送的消息数量越多,性能就越好,判断依据是单条消息的长度,如果单条消息内容比较长,则打包发送多条消息会影响其他线程发送消息的响应时间,并且单批次消息发送总长度不能超过DefaultMQProducer#maxMessageSize,即配置文件中的rocketmq.producer.max-message-size
代码
/** * 同步-批量发送 */ public SendResult syncBatchSend(String topic, String tag, List contentList) { String destination = this.convertDestination(topic, tag); List messageList = contentList.stream() .map(content -> MessageBuilder.withPayload(content).build()) .collect(Collectors.toList()); return rocketMQTemplate.syncSend(destination, messageList); }

4. 标签tag rocketmq中,topic与tag都是业务上用来归类的标识,区分在于topic是一级分类,而tag可以理解为是二级分类。定义上:
  • topic: 消息主题,通过Topic对不同的业务消息进行分类。
  • tag: 消息标签,用来进一步区分某个Topic下的消息分类,消息从生产者发出即带上的属性。
实际业务中,什么时候该用topic或tag呢?有以下几种建议:
  • 消息类型是否一致: 如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的Topic,无法通过Tag进行区分。
  • 业务是否相关联: 没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的Topic进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用Tag进行区分。
  • 消息优先级是否一致: 如同样是物流消息,盒马必须小时内送达,天猫超市24小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的Topic进行区分。
  • 消息量级是否相当: 有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的Topic。
举个实际项目的例子吧:我刚刚做的一个项目叫共享中心,所有需要共享的资源都来自于各个业务服务方。创建共享资源、撤回共享资源等这类指令,我将其定义为不同的topic。而发送给topic的消息体中有“资源类型”的字段,每个业务接入方其实只关心对应自己资源类型的消息,那么就将“资源类型”定义为tag,各个业务方的消费端只监听自己所需的tag即可。
如果没有tag的机制,消费端就得接收所有消息,反序列化后只处理自己对应“资源类型”的消息。有了tag机制,消息在进入消费端途中就自动进行过滤分发。
与 RabbitMQ AMQP协议 比较
在使用tag机制后,第一时间让我想到了 RabbitMQ 的 AMQP协议,很像交换机和队列的机制。当使用tag之后,像扇形交换机;当没使用tag之后,就像直连交换机。
  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Binding:绑定,它的作用就是把 Exchange 和 Queue 按照路由规则绑定起来。
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
我想,二者设计的目的都是一样的,就是让生产者和消费者之间解耦,让一个消息,可以自由地流转到不同的消息端。RocketMQ 在这点,相较于 RabbitMQ 而言,提供的功能不够丰富,但更实用、简洁。
示例代码:生产者
通过前面示例代码中,最简单的发送同步消息代码来看,最新 starter 中封装的 RocketMQTemplate 类中,消息发送的目标是 String destination。而 它包含了 topictag。即私有转换方法中的: destination = topic:tag
private final RocketMQTemplate rocketMQTemplate; private String convertDestination(String topic, String tag) { return StringUtils.isBlank(tag) ? topic : StringUtils.join(topic, ":", tag); }/** * 同步发送 */ public SendResult syncSend(String topic, String tag, String content) { String destination = this.convertDestination(topic, tag); return rocketMQTemplate.syncSend(destination, content); }

示例代码:消费者
@RocketMQMessageListener(consumerGroup = ShareRocketMqConstants.GROUP_PREFIX + ShareRocketMqConstants.TOPIC_SHARE_RSRC_TO_BIZ_CALLBACK, topic = ShareRocketMqConstants.TOPIC_SHARE_RSRC_TO_BIZ_CALLBACK, selectorExpression = "2||3||4", consumeThreadMax = 3) public class ShareRsrcMqConsumer implements RocketMQListener { ... ... }

上述消费端代码中,申明只消费tag值为:2、3、4 的消息,注解中核心有两个属性:
  • selectorType:默认值就是 SelectorType.TAG,所以示例代码中没有设置。
  • selectorExpression:对应的表达式。针对 SelectorType.TAG类型的,就需要设置 tag的表达式。默认值是 * ,即所有tag都消费。如果想要指定消费多个tag,则用 || 或符合来连接。
注意: 生产者端,发消息只能指定一个tag。但消费者端,接收消息可以指定多个tag。
5. 延迟/定时消息 定时消息是指消息发到broker后,不能立刻被consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。
原理
其实定时消息实现原理比较简单,如果一个topic对应的消息在发送端被设置为定时消息,那么会将该消息先存放在topic为 SCHEDULE_TOPIC_XXXX 的消息队列中,并将原始消息的信息存放在commitLog文件中,由于topic为SCHEDULE_TOPIC_XXXX,所以该消息不会被立即消息,然后通过定时扫描的方式,将到达延迟时间的消息,转换为正确的消息,发送到相应的队列进行消费。
延迟级别
尽管 rocketmq 支持定时消息,但是当前开源版本的 rocketmq 所支持的定时时间是有限的、不同级别的精度的时间,并不是任意无限制的定时时间。默认 Broker服务器端有18个定时级别,每一个级别分别对应不同的延迟时间:
延迟级别 延迟时间
1 1s
2 5s
3 10s
4 30s
5 1m
6 2m
7 3m
8 4m
9 5m
10 6m
11 7m
12 8m
13 9m
14 10m
15 20m
16 30m
17 1h
18 2h
代码
发送延迟消息并没有特殊的方法,而是基于普通发消息的方法(如:rocketMQTemplate.syncSend)做了重载,增加了一个传入参数 int delayLevel,默认值为 0,即立即发送。
/** * 同步延迟发送 * * @param delayLevel 延时等级:现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级 1 到 18 *1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h */ public SendResult syncSendDelay(String topic, String tag, String content, long timeout, int delayLevel) { String destination = this.convertDestination(topic, tag); Message message = MessageBuilder.withPayload(content).build(); return rocketMQTemplate.syncSend(destination, message, timeout, delayLevel); }

6. 顺序消息 顺序消息是一种对消息发送和消费顺序有严格要求的消息,对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
RocketMQ目前只能保证同一个分区队列内的顺序消息,因此实现下列场景的方式有:
  • 分区有序: RocketMQ 支持同一个队列分区内的顺序消息。另外某个 Topic 下,所有消息根据 ShardingKey 进行分区,相同 ShardingKey 的消息必须被发送到同一个分区队列。因此只要保证消息按照同一 ShardingKey 发送即可,然后保证 Consumer 同一个队列单线程消费即可。
  • 全局有序: 当设置 Topic 下只有一个分区时,可以实现全局有序。
全局有序的性能太差,推荐使用分区有序。假设我们要通过mq处理订单内的消息。同一个 topic,通常我们只需要保证同一个订单下的消息顺序发布和消费即可,不同订单下的消息应该互不干扰。因此可以采用分区有序,将订单号转换为 ShardingKey,只要保证同一个订单下的消息都流转到同一个队列下,然后顺序消费。
最常见将订单号转换为 ShardingKey 的方式就是 hashKey。
生产者
/** * 同步顺序发送 * * @param hashKey 根据 hashKey 和 队列size() 取模,保证同一 hashKey 的消息发往同一个队列,以实现 同一hashKey下的消息 顺序发送 *因此 hashKey 建议取 业务上唯一标识符,如:订单号,只需保证同一订单号下的消息顺序发送 */ public SendResult syncSendOrderly(String topic, String tag, String content, String hashKey) { String destination = this.convertDestination(topic, tag); Message message = MessageBuilder.withPayload(content).build(); return rocketMQTemplate.syncSendOrderly(destination, message, hashKey); }

消费者
针对顺序消息的消费,代码也很容易,主要是 @RocketMQMessageListener 注解,通过设置了consumeMode = ConsumeMode.ORDERLY,表示使用顺序消费。
ConsumeMode 有两种值:
  • CONCURRENTLY:默认值,并发同时接收异步传递的消息。
  • ORDERLY:顺序消费时开启,只开启一个线程,同一时间只有序接收一个队列的消息。
@RocketMQMessageListener(topic = "xxx-topic", consumerGroup = "xxxGroup", consumeMode = ConsumeMode.ORDERLY) public class OrderConsumer implements RocketMQListener { @Override public void onMessage(String message) { ... ... } }

提问:如果针对顺序消息的消费者,同时启动了多个spring实例,会影响吗?
这个问题当时想了好久,为了保证消息按照顺序消费,消费者是单线程消费的。可实际线上程序都不会是单节点,如果有多个spring实例,不是也可以理解成“多线程”处理了吗?
首先,回顾几个知识点吧:
  • 顺序消费只能针对 “集群模式” 消费,即 messageModel = MessageModel.CLUSTERING
  • 集群模式下,多个消费者如何对消息队列进行负载呢?消息队列负载机制遵循一个通用的思想:一个消息队列同一时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。
  • 虽然消费者代码中, RocketMQ 监听器像是mq中的“推模式”。但实际上,RocketMQ消息推模式基于拉模式实现,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。
顺序消息消费时,在每次拉取任务时,都会像broker申请锁住该队列。因此,就算有多个消费者实例同时在运行,针对单个队列中的顺序消息,依然是顺序消费的。
7. 事务消息 这里着重说明一下,RocketMQ 的事务机制,和我们通常说的通过MQ来实现最终一致性的分布式事务机制,不是一个事情。
RocketMQ 的事务机制,只体现在生产者,保障的是生产者本地的事务执行、发消息,这两个事务达成一致性。至于消费者收到消息后的事务处理,并不在当前机制内。
正常事务的流程
正常事务的流程,遵循的是 2PC 的方案。
  1. 调用发送事务消息方法,正常发送消息。发送事务的方法名为 syncSendInTransaction
  2. mq服务器端成功接收到消息后,消息处于一个半接收的状态,并响应给生产者客户端。
  3. 生产者收到服务器端成功接收的响应后,执行本地事务。本地事务写在 executeLocalTransaction 方法里面,返回结果为枚举 RocketMQLocalTransactionState,有:COMMIT、ROLLBACK、UNKNOWN 三种值。
  4. 服务器端收到 COMMIT 状态后,会把消息下发给消费者
  5. 服务器端收到 ROLLBACK 状态后,会删除掉当前半接收状态的消息,不再处理。
  6. 服务器端收到 UNKNOWN 状态,或者服务器端超时未收到消息,或者生产者未响应状态,则将进行消息补偿机制。
消息补偿机制
这部分比较简单,针对上述事务流程第6点的几种情况,会触发消息回查。
  1. 当事务消息出现 UNKNOWN、超时、未响应时,服务器会主动调用生产者本地的回查方法 checkLocalTransaction,查询本地事务执行情况,返回结果还是枚举值 RocketMQLocalTransactionState
  2. 服务器接收到返回结果的处理流程和前面的正常流程一样。
  3. 如果依然是 UNKNOWN、超时、未响应,将继续重试。如果超过最大重试次数后,依然无果,则视为 ROLLBACK,删除当前消息。
事务消息相关的参数,基本在 org.apache.rocketmq.common.BrokerConfig 类中定义,例如以下几个常用属性的默认值:
  • transactionTimeOut = 6000L:服务器未收到事务本地消息的超时时间为1分钟。
  • transactionCheckMax = 15:消息补偿机制中的最大回查次数为15次。
  • transactionCheckInterval = 6000L:消息补偿机制中每次回查的时间间隔为1分钟。
因为是 BrokerConfig 类中的属性,因此如果不想用默认值,可以在 broker.conf 文件中自定义修改。
代码:生产者发送事务消息
/** * 事务发送 */ public TransactionSendResult syncSendInTransaction(String topic, String tag, String content) { String destination = this.convertDestination(topic, tag); String transactionId = UUID.randomUUID().toString(); Message message = MessageBuilder.withPayload(content) .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) .build(); return rocketMQTemplate.sendMessageInTransaction(destination, message, content); }

代码:生产者定义事务本地方法
在生产者客户端,通过事务监听器,实现 RocketMQLocalTransactionListener 接口的两个上述方法。
@RocketMQTransactionListener public class LocalTransactionListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("executeLocalTransaction: "+ LocalDateTime.now()); return RocketMQLocalTransactionState.UNKNOWN; }@Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { System.out.println("checkLocalTransaction: "+ LocalDateTime.now()); return RocketMQLocalTransactionState.COMMIT; } }

提问:如果spring项目中有多个事务消息生产者,怎么区分不同的RocketMQLocalTransactionListener?
@RocketMQLocalTransactionListener 这个注解提供了属性,可以区分不同的事务消息生产者。在 stater 2.0.4 版本中,是提供 txProducerGroup 这个属性指向一个消息发送者组,映射不同的事务消息发送逻辑。但好像有bug,在后续新的版本迭代中,去掉了这个属性。
到了 2.1.1 版本,只能通过指定 rocketMQTemplateBeanName 来实现,即不同的事务消息发送时,就得定义不同的 RocketMQTemplate。挺麻烦的,期待这个功能在后续的迭代中完善好。
8. 重试队列、死信队列 RocketMQ 很多应多异常的保全机制,例如消息重发的机制,这里可以分两类:
  • 生产者重发: 在前面介绍三种发送消息方式时,针对同步、异步发送失败时,都会再重发,相应重发次数分别对应 DefaultMQProducer 类中属性值 retryTimesWhenSendFailedretryTimesWhenSendAsyncFailed ,也可以在 properties 配置文件中自定义设置。
  • 消费者重发: 当消息已经进入 broker 后,消费者接收失败,broker 也会给消费者重发,以下衍生出本次的重试队列、死信队列。
重试队列
如果消费者端因为各种类型异常导致本次消费失败,为防止该消息丢失而需要将其重新回发给broker端保存,保存这种因为异常无法正常消费而回发给mq的消息队列称之为重试队列。
RocketMQ 会为每个消费组都设置一个 topic 名称为 “%RETRY%+consumerGroup” 的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的)。
用于暂时保存因为各种异常而导致消费者端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至 topic 名称为“SCHEDULE_TOPIC_XXXX” 的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
死信队列
由于有些原因导致消费者端长时间的无法正常消费从 broker 端 pull过来的业务消息,为了确保消息不会被无故的丢弃,那么超过配置的“最大重试消费次数”后就会移入到这个死信队列中。
在RocketMQ中,SubscriptionGroupConfig 配置常量默认地设置了两个参数,一个是retryQueueNums为1(重试队列数量为1个),另外一个是retryMaxTimes为16(最大重试消费的次数为16次)。Broker端通过校验判断,如果超过了最大重试消费次数则会将消息移至这里所说的死信队列。这里,RocketMQ会为每个消费组都设置一个 topic 命名为 “%DLQ%+consumerGroup" 的死信队列。但如果一个消费者组未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列的。
【RocketMQ - 应用篇】因为死信队列中的消息是无法被消费的,它也证实了一部分消息出现了意料之外的情况。因此一般在实际应用中,移入至死信队列的消息,需要人工干预处理。例如通过 console 查看是否有私信队列,当解决问题后,可在 console 上手动重发消息。

    推荐阅读