RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会。
我之前写过 RabbitMQ 的文章,毕竟先入为主,后续在介绍 RocketMQ 的功能时,可能会穿插地拿 RabbitMQ 做比较。当前公司内的消息中间件选型,也是从 RabbitMQ 转为了 RocketMQ,技术总监告诉我的理由也很简单,因为 RocketMQ 的分布式集群可用性更高,运维更简单。
也的确,抛去为大数据而生的 Kafka 不说,这二者除了在架构和使用方式上差距很大,但在实际应用中的性能、效果上差距不大。有人说 RabbitMQ 的响应速度更快,有人说 RocketMQ 的数据吞吐量更高,但也是差距不大,各有千秋。国内也没有多少公司有那么大的体量,对性能那么较真。
1. 基本组件
1.1. 名词概念
Name ServerName Server 是 RocketMQ 集群的协调者,集群的各个组件是通过 Name Server 获取各种属性和地址信息的。主要功能包括两部分:
- 各个 Broker 定期上报自己的状态信息到 Name Server,维持心跳。
- 各个客户端,包括Producer、Consumer,以及命令行工具,通过 Name Server 获取 Broker 等最新的状态信息。
可以把 Name Server 类比成 Kafka 中的 ZooKeeper,那为什么不直接用 ZooKeeper 呢?因为 RocketMQ 只能用到 ZooKeeper 的少部分功能,直接用会显得太重,就自己开发了相较而言更轻量级、更满足自身特性的 Name Server。
BrokerBroker 主要负责消息的存储、投递和查询以及服务高可用保证,说白了就是 RocketMQ 的服务器。Broker 是中间件的核心,绝对不能挂,更是要保障它的可靠性,通常会搭建主从高可用架构,因此 Broker 有分 Master Broker(BrokerId 为0)和 Slave Broker(BrokerId 非0)。
每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。Broker 启动后需要完成一次将自己注册至 Name Server 的操作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。
Name Server 集群节点相互不通讯,所以上报信息时需要上报所有节点。另外 Name Server 是无状态的,即数据并不会做持久化存储,全部存储在内存中,重启后即消失。
Producer与 Name Server 集群中的其中一个节点(随机)建立长链接(Keep-alive),定期从 Name Server 读取 Topic 路由信息,并向提供 Topic 服务的 Master Broker 建立长链接,且定时向 Master Broker 发送心跳。
Consumer与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从 Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。
Topic消息主题,一级消息分类,通过Topic对不同的业务消息进行分类。
Tag消息标签,用来进一步区分某个Topic下的消息分类,消息从生产者发出即带上的属性。
Queue对于 RocketMQ 来说,Topic 是逻辑上的概念,一个 Topic 可以分布在各个 Broker 上,把一个 Topic 分布在一个 Broker 上的子集定义为一个 Topic 分片。在分片基础上再等分为若干份(可指定份数)后的其中一份,就是 Queue 队列。
Queue 是负载均衡过程中资源分配的基本单元。
1.2. Consumer Group 和 Queue
Consumer Group 消费者组在实际消费消息时,都需要申明 Topic名、消费者组名。在集群消费模式下,同一个 Topic 内的消息,会分别分发给同一个 Consumer Group 内的不同 Consumer,以达到负载均衡的效果。
怎么理解同一个 Consumer Group 内的不同 Consumer 呢?不同业务系统基于同一个Consumer Group 消费同一个 Topic 内的消息算是;为了提高并发量,直接将某个业务系统的进程横向拓展(如:k8s 中增加几个pod )也算是。
但如果只是在某个 Consumer 的代码中,增加几个线程,如
@RocketMQMessageListener.consumeThreadMax
,不算是增加 Consumer。只是作为一个 Consumer 在拉取了一批消息后,增加线程去并发执行。读、写队列在创建、更改 Topic 时,会要求设置读队列数、写队列数。
在发送消息时,会根据 Topic 写队列数返回路由信息;在消费消息时,会根据 Topic 读队列数返回路由信息。
读、写队列并非物理上完全对立的队列,如:
- 写队列8个,读队列4个: 会创建8个文件夹(0、1、2、3、4、5、6、7),消息会发送给这8个队列,但消费时只能消费到4个队列(0、1、2、3),另外4个队列(4、5、6、7)中的消息不会被消费到。
- 写队列4个,读队列8个: 消息只会发送给4个队列(0、1、2、3),消费时会从8个队列中消费消息,但只有(0、1、2、3)队列中有消息。如果某个消费者被分配了队列(4、5、6、7),则什么消息也收不到。
读队列数 = 写队列数
,那 RocketMQ 为什么还要多此一举呢?为了方便队列扩容、缩容。一个topic在每个broker上创建了128个队列,现在需要将队列缩容到64个,怎么做才能100%不会丢失消息,并且无需重启应用程序?最佳实践:先缩容写队列128->64,写队列由0 1 2 ……127缩至 0 1 2 ……..63。等到64 65 66……127中的消息全部消费完后,再缩容读队列128->64。如果同时缩容写队列和读队列,可能会导致部分消息未被消费。
Consumer Group 和 Queue 有关负载均衡既然是负载均衡,那么讨论的就是集群消费模式。之前说过,Queue 是负载均衡过程中资源分配的基本单元。
因此,在一个 Consumer Group 内,Consumer 和 Queue 是 1:n 的关系:
- 一个 Queue 最多只能分配给一个 Consumer。
- 一个 Cosumer 可以分配得到多个 Queue。
2. 高可用架构 前面介绍过 RocketMQ 各个组件的名词概念,那么现在说说,这些组件是如何搭建成 RocketMQ 的架构的。
2.1. Name Server 集群
是一个几乎无状态节点,可集群部署,集群节点间相互独立没有信息交换。其功能主要为更新和发现 Broker 服务,生产者或消费者能够通过其查找到各主题相应的 Broker IP 列表
之前说过,Name Server 是独立的,每台 NameServer 都会有完整的集群路由信息,包括所有的 Broker 节点的信息,我们的数据信息等等。所以只要任何一台 NamseServer 存活下来,就可以保存 RocketMQ 信息的正常运行,不会出现故障。
所以为了提高可用性,Name Server 的节点数至少是 2个及以上。虽然可以直接部署在 Broker 所处的机器上,但如果有条件最好单独部署。
2.2. Broker 集群
可以搭建的 Broker 集群有很多种,按照功能性来分:
- 单节点模式:就一个 Master Broker。
- 主从模式:每个 Master Broker 配多个 Slave Broker。只有 Master 接受 Topic 创建 Queue,消息写入等,Slave 只是同步 Master 上的这些数据。不过有同步/异步之分。(1)同步,只有当消息从 Master 同步到 Slave,才算消息发送成功;(2)异步,消息发送到 Master 就算发送成功,后续消息在从 Master 异步刷新到 Slave 上。
- 多 Master 模式:可以单是多个Master,也可以是多个 Master-Slave 主从,多个Master 可以提高消息并发性、高可用性,这也是为什么 Topic 会在各个 Master Broker 上创建分片队列。
- Dledger 模式:在主从模式中,Slave 挂掉了影响不大,可如果 Master 挂掉了就都不能用了。除非手动的将某个 Slave Broker 切换为新的 Master Broker。而 Dledger 就解决了这个问题,它可以监控一组 Broker,当 Master 挂掉后,会从余下的 Slave 中重新选举新的 Master。
看完这些,第一反应就是这和 Redis 的高可用集群架构好像,前面多 Master 多 Slave 基本一样,然后 RocketMQ 的 Dledger 模式,不就对应 Redis 中的 Sentinel 哨兵模式嘛。看来分布式发展到今天,对于高可用架构的方案逐渐稳定且统一了。
文章图片
2.3. Dledger 模式
DLedger 是 OpenMessaging 中一个基于 Raft 算法的 CommitLog 存储库实现,从 RocketMQ 4.5.0 版本开始,RocketMQ 引入 DLedger 模式来解决了 Broker 组内自动故障转移的问题。现在用于部署 RocketMQ 集群最常见的是用 RocketMQ-on-DLedger Group。
RocketMQ-on-DLedger Group 是指一组相同名称的 Broker,至少需要 3 个节点,通过 Raft 自动选举出一个 Leader,其余节点 作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。
RocketMQ-on-DLedger Group 能自动容灾切换,并保证数据一致。
RocketMQ-on-DLedger Group 是可以水平扩展的,也即可以部署任意多个 RocketMQ-on-DLedger Group 同时对外提供服务。
在基于 RocketMQ-on-DLedger Group 部署时,每个 Broker 节点的配置文件需要多加一下配置,当然还是针对RocketMQ 4.5.0 以上版本 :
enableDLegerCommitLog:是否启动 DLedger
dLegerGroup:DLedger Raft Group 的名字,建议和 brokerName 保持一致
dLegerPeers:DLedger Group 内各节点的地址与端口信息(同一个 Group 内的各个节点配置必须要保证一致)
dLegerSelfId:节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一,例如:第一个节点配置为”n0”,第二个节点配置为”n1”,第三个节点配置为”n2”
sendMessageThreadPoolNums:发送线程个数(建议配置成 CPU 核数)
2.4. 和 RabbitMQ 对比
开头说,我们的技术总监因为高可用集群而选择了 RabbitMQ,那么可以回顾一下 RabbitMQ 的集群是什么样的。
普通集群在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。创建的 Queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 Queue 的元数据(元数据可以认为是 Queue 的一些配置信息,通过元数据,可以找到 Queue 所在实例)。消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 Queue 所在实例上拉取数据过来。
问题在于,如果消费者读取 Queue 所在的不是所属实例,还要从原实例拉取数据,有性能开销。可如果恰好是 Queue 实际所属实例,那和单节点有啥区别,依然有单节点的性能瓶颈。
而且这种集群模式,也只能提高了部分并发量,并没有高可用性。因为其他实例只同步了 Queue 的元数据,如果 Queue 所处实例宕机了,依然拉取不到 Queue 中的消息。
镜像集群镜像集群是在前者的模式做了更改。创建的 Queue 还是只会放在一个实例上,但其他每个实例不光同步 Queue 的元数据,还同步消息数据。
这样就有了高可用性,Queue 所属实例宕机了,未消费完的消息依然可以从其他实例中读到。
但这相当于每个实例上,都完整保存了所有队列的消息,不说性能,光对磁盘的要求有多大。
RocketMQ、RabbitMQ 对比反观 RocketMQ 的高可用架构则更科学:
- Topic 在不同 Master Broker 上分片,Queue 分散在不同集群。虽然 RocketMQ 的主从同步类似于镜像,也是将 Queue 的元数据和消息都同步过去。但毕竟只是经过拆分过的 Topic 部分数据,量没那么大。
- 读写分离,想拓展写并发就拓展 Master,想拓展读并发就拓展 Slave,更加灵活。
- RocketMQ 每个实例各有分工,甚至还有独立的 Name Server,所以对单个集群实例的性能消耗不大。而对于 RabbitMQ 而言,每个实例可能会有性能瓶颈,对机器上的物理资源可能要求较高。
控制台 consoleRocketMQ 不像 RabbitMQ 自带 console,不过它对外提供API。目前社区有很多可接入 RocketMQ console 的前端项目,目前我用的是
rocketmq-console-ng
,还挺不错的。接入 Prometheusconsole 毕竟在监控有限,目前使用最广泛监控解决方案的就是 Prometheus 了吧,RocketMQ 也提供接入的方案。
这里就提到主角
RocketMQ-Exporter
,目前已被 Prometheus 官方收录,其地址为 https://github.com/apache/roc...。它首先从 RocketMQ 集群采集数据,然后借助 Prometheus 提供的第三方客户端库将采集的数据规范化成符合 Prometheus 系统要求的数据,Prometheus 定时去从 Exporter 拉取数据即可。3.2. RocketMQ、RabbitMQ 对比
搭建运维上在运维搭建时,RabbitMQ 要简单的多,全程就一个 server,还包含了 console 控制台。
同样的情况在 RocketMQ 上则相对复杂,要分别搭建 Name Server、Broker(分 Master、Slave),要 console 也要自己搭。
但复杂有复杂的好处,在高可用集群上就体现出来了,这里不再赘言。
开发使用上RabbitMQ 的 AMQP 协议,相较于 Kafka、RocketMQ 来说,学习成本要高的多。无论是发送普通消息,还是复杂一点的像延迟消息,都需要理解和使用交换机和队列之间的配合。而在 RocketMQ 上使用时则简单的多,无法提供一个API即可,框架封装了很多实现的细节。
有人说 RocketMQ 框架本身较重,但重有重的好处,框架封装的越多,对于使用的人来说就越方便。像顺序消息、延迟消息、事务消息等,几乎可以开箱即用。最近在学车,感觉 RabbitMQ 像是手动挡,而 RocketMQ 像是自动挡,不知道这个比喻是否贴切。
3.3. docker compose 单机安装
【RocketMQ - 理论篇】docker-compose.yml
version: '3.5'
services:
rmqnamesrv:
image: rocketmqinc/rocketmq:4.4.0
container_name: rmqnamesrv
restart: always
ports:
- 9876:9876
environment:
#内存分配
JAVA_OPT_EXT: "-server -Xms1g -Xmx1g"
volumes:
- /Volumes/rocketmq/namesrv/logs:/root/logs
command: sh mqnamesrv
networks:
rmq:
aliases:
- rmqnamesrvrmqbroker:
image: rocketmqinc/rocketmq:4.4.0
container_name: rmqbroker
restart: always
depends_on:
- rmqnamesrv
ports:
- 10909:10909
- 10911:10911
volumes:
- /Volumes/rocketmq/broker/logs:/root/logs
- /Volumes/rocketmq/broker/store:/root/store
- /Volumes/rocketmq/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf
command: sh mqbroker-c /opt/rocketmq-4.4.0/conf/broker.conf
environment:
NAMESRV_ADDR: "rmqnamesrv:9876"
JAVA_OPT_EXT: "-server -Xms1g -Xmx1g -Xmn1g"
networks:
rmq:
aliases:
- rmqbrokerrmqconsole:
image: styletang/rocketmq-console-ng
container_name: rocketmq-console
restart: always
ports:
- 9877:8080
depends_on:
- rmqnamesrv
volumes:
- /etc/localtime:/etc/localtime:ro
- /Volumes/rocketmq/console/logs:/root/logs
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
networks:
rmq:
aliases:
- rmqconsolenetworks:
rmq:
name: rmq
driver: bridge
推荐阅读
- RocketMQ - 应用篇
- RocketMQ-单机版安装及远程连接测试
- RocketMQ安装及测试
- RocketMQ原理
- Kafka|Kafka VS RocketMQ VS RabbitMQ
- 【运维】【一】RocketMQ 运维命令详解
- RocketMQ
- RocketMQ遇到的坑
- Rocketmq创建topic报错org.apache.rocketmq.acl.common.AclException: [10015:signature-failed] unable to cal