RocketMQ——RocketMQ搭建及问题解决

RocketMQ——RocketMQ搭建及问题解决
文章目录

  • RocketMQ——RocketMQ搭建及问题解决
    • RocketMQ简介
    • RocketMQ安装
      • 下载地址
      • 系统要求
      • 准备工作
    • 单机模式安装
      • 启动nameserver
      • 启动broker
      • 测试
      • 关闭消息队列
      • 问题
        • 解决方案
    • 集群搭建
      • 启动NameServer
      • 启动broker
        • 修改配置文件
          • 修改机器1的配置文件(192.168.108.128)
          • 修改机器2的配置文件(192.168.108.129)
        • 启动四个broker
      • 问题
        • 解决方案
      • 测试
        • 生产者代码:
        • 消费者代码
        • 测试结果
    • 可视化管理平台

RocketMQ简介 RocketMQ是一个开源的分布式消息和流数据平台,由阿里研发目前属于apache顶级项目:http://rocketmq.apache.org/
RocketMQ消息队列主要功能功能:引用解耦、流量消峰、消息分发、保证最终一致性、方便动态扩容
RocketMQ安装 下载地址
RocketMQ 4.5.1
系统要求
64位 Linux、unix或mac; JDK版本1.8以上;

准备工作
由于下载的是zip压缩格式文件,因此在linux上安装unzip来进行解压
yum install -y unzip

使用unzip命令解压
unzip rocketmq-all-4.5.1-bin-release.zip

单机模式安装 启动nameserver
两种启动方式
  1. 进入解压目录:/rocketmq-all-4.5.1-bin-release/bin,输入./mqnamesrv进行启动
[root@node1 bin]# ./mqnamesrv Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release. The Name Server boot success. serializeType=JSON

命令行提示启动成功
  1. 进入解压目录:/rocketmq-all-4.5.1-bin-release/bin,输入指令nohup sh bin/mqnamesrv &
[root@node1 bin]# nohup sh ./mqnamesrv & [1] 68705

命令行显示启动的进程号,查看启动日志
namesrv.log ... The Name Server boot success. serializeType=JSON

日志提示启动成功
启动broker
两种启动方式:
  1. 直接运行mqbroker
./mqbroker -n localhost:9876 The broker[node1, 172.17.0.1:10911] boot success. serializeType=JSON and name server is localhost:9876

或者
./mqbroker The broker[node1, 172.17.0.1:10911] boot success. serializeType=JSON

提示启动成功
  1. 通过nohup指令启动
nohup sh ./mqbroker -n localhost:9876 & [2] 69248

查看broker启动日志
tail -f ~/logs/rocketmqlogs/broker.log ... INFO main - The broker[node1, 172.17.0.1:10911] boot success. serializeType=JSON and name server is localhost:9876

提示启动成功
测试
运行源文件中以写好的测试demo
export NAMESRV_ADDR=localhost:9876 # 生产者 sh tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK, msgId=AC110001104B2B193F2D6EA72CC003E7... # 消费者 sh tools.sh org.apache.rocketmq.example.quickstart.Consumer ConsumeMessageThread_20 Receive New Messages: [MessageExt [queueId=3, storeSize=179...

单点消息队列启动成功
关闭消息队列
通过mqshutdown命令关闭消息队列,依次关闭nameserver和broker
sh mqshutdown broker The mqbroker(69255) is running... Send shutdown request to mqbroker(69255) OK

sh mqshutdown namesrv The mqnamesrv(68711) is running... Send shutdown request to mqnamesrv(68711) OK

问题
broker内存不够
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12) # # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory. # An error report file with more information is saved as: # /component/rocketmq-all-4.5.1-bin-release/bin/hs_err_pid68880.log

解决方案 报错原因是虚拟机启动内存不够,修改bin下的服务启动脚本 runserver.sh 、runbroker.sh 中对于内存的限制,
runserver.sh: JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"runbroker.sh: JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

改成如下示例:
runserver.sh: JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"runbroker.sh: JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"

集群搭建 通过两台虚拟机(192.168.108.128和192.168.108.129)模拟集群
启动NameServer
分别启动两台虚拟机的NameServer,启动方式与单机模式相同
启动broker
两台虚拟机的broker互为主备,即各自启动一个Master和一个Slave,在启动broker之前需要修改相关配置文件
修改配置文件 在目录conf/2m-2s-sync中(两主两从,同步更新),该目录下有四个配置文件,分别对应两个Master和两个Slave配置
├── broker-a.properties#Master-a配置 ├── broker-a-s.properties #Slave-a配置 ├── broker-b.properties#Master-b配置 └── broker-b-s.properties #Slave-b配置

在这里我们需要修改192.168.108.128的broker-a.properties和broker-b-s.properties,即128这台机器是名字为broker-a的broker的主节点,是名称为broker-b的broker的从节点;相对应的192.168.108.129是名字为broker-a的broker的从节点,是名称为broker-b的broker的主节点,因此修改129机器的broker-b.properties和broker-a-s.properties配置文件。修改如下:
修改机器1的配置文件(192.168.108.128) master:broker-a.properties
namesrvAddr=192.168.108.128:9876; 192.168.108.129:9876 #配置nameserver地址 listenPort=10911#broke监听端口号 storePathRootDir=/home/rocketmq/store-a#存储消息和配置信息路径 brokerClusterName=DefaultCluster#集群名称 brokerName=broker-a#broke名称,集群中master和slave通过相同的broke名来进行关联 brokerId=0#0表示master,大于0为slave对应ID deleteWhen=04#删除消息时间,04表示凌晨4点 fileReservedTime=48#磁盘上保存消息的时长,单位小时 brokerRole=SYNC_MASTER#broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE flushDiskType=ASYNC_FLUSH#刷盘策略

slave:broker-b-s.properties
namesrvAddr=192.168.108.128:9876; 192.168.108.129:9876 #配置nameserver地址 listenPort=11011#broke监听端口号 storePathRootDir=/home/rocketmq/store-b#存储消息和配置信息路径 brokerClusterName=DefaultCluster#集群名称 brokerName=broker-b#broke名称,集群中master和slave通过相同的broke名来进行关联 brokerId=1#0表示master,大于0为slave对应ID deleteWhen=04#删除消息时间,04表示凌晨4点 fileReservedTime=48#磁盘上保存消息的时长,单位小时 brokerRole=SLAVE#broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE flushDiskType=ASYNC_FLUSH#刷盘策略

修改机器2的配置文件(192.168.108.129) master:broker-b.properties
namesrvAddr=192.168.108.128:9876; 192.168.108.129:9876 #配置nameserver地址 listenPort=10911#broke监听端口号 storePathRootDir=/home/rocketmq/store-b#存储消息和配置信息路径 brokerClusterName=DefaultCluster#集群名称 brokerName=broker-b#broke名称,集群中master和slave通过相同的broke名来进行关联 brokerId=0#0表示master,大于0为slave对应ID deleteWhen=04#删除消息时间,04表示凌晨4点 fileReservedTime=48#磁盘上保存消息的时长,单位小时 brokerRole=SYNC_MASTER#broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE flushDiskType=ASYNC_FLUSH#刷盘策略

slave:broker-a-s.properties
namesrvAddr=192.168.108.128:9876; 192.168.108.129:9876 #配置nameserver地址 listenPort=11011#broke监听端口号 storePathRootDir=/home/rocketmq/store-a#存储消息和配置信息路径 brokerClusterName=DefaultCluster#集群名称 brokerName=broker-a#broke名称,集群中master和slave通过相同的broke名来进行关联 brokerId=1#0表示master,大于0为slave对应ID deleteWhen=04#删除消息时间,04表示凌晨4点 fileReservedTime=48#磁盘上保存消息的时长,单位小时 brokerRole=SLAVE#broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE flushDiskType=ASYNC_FLUSH#刷盘策略

启动四个broker
nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-a.properties & nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-b-s.properties & nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-b.properties & nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-a-s.properties &

2019-07-25 14:00:19 INFO BrokerControllerScheduledThread1 - Update slave consumer offset from master, 192.168.108.128:10911 2019-07-25 14:00:19 INFO BrokerControllerScheduledThread1 - Update slave delay offset from master, 192.168.108.128:10911 2019-07-25 14:00:20 INFO brokerOutApi_thread_2 - register broker[0]to name server 192.168.108.129:9876 OK 2019-07-25 14:00:20 INFO brokerOutApi_thread_1 - register broker[0]to name server 192.168.108.128:9876 OK

名两行出现broker注册到nameserver成功,则集群搭建成功
问题
Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.0.2:10911> failed

解决方案 出现该问题是因为虚拟机中搭建了docker产生了虚拟网卡过多,RocketMQ在访问docker网络时不通产生问题。
首先关闭docker服务
systemctl stop docker systemctl is-enableddocker#查询是否自启动 systemctl disabledocker#禁止自启动

然后重启虚拟机即可解决
测试
通过java代码对集群进行测试
生产者代码:
public class producer { public static void main(String[] args) throws Exception { //创建生产者,并设置group DefaultMQProducer producer = new DefaultMQProducer("group"); //设置nameserver producer.setNamesrvAddr("192.168.108.128:9876"); //启动生产者 producer.start(); //创建发送的消息,发送100条数据 for (int i = 0; i < 100; i++){ Message msg = new Message("TopicTest", "TagA",("MQ Test"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)); //发送 SendResult result = producer.send(msg); System.out.println(result); } //关闭生产者 producer.shutdown(); } }

消费者代码
public class consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group"); consumer.setNamesrvAddr("192.168.108.128:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> { System.out.println(Thread.currentThread().getName() + " Receive Message: " + list); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); System.out.println("consumer start..."); consumer.start(); } }

测试结果 生产者
RocketMQ——RocketMQ搭建及问题解决
文章图片

消费者
RocketMQ——RocketMQ搭建及问题解决
文章图片

从控制台可以看到,在消费者端显示消息发送成功的数据,sendStatus均为ok,消费者端显示读取出来对应的消息,集群搭建成功
可视化管理平台 RocketMQ拥有一个可视化的管理平台,可以通过图形界面的方式查看集群情况、topic、生产者、消费者等具体信息
地址: RocketMQ扩展,项目拉下来后进入目录rocketmq-console
项目使用springboot搭建,在配置文件application.properties中修改配置rocketmq.config.namesrvAddr
rocketmq.config.namesrvAddr=192.168.108.128:9876; 192.168.108.129:9876

启动项目,运行App.java。访问localhost:8080即可进入管理平台
【RocketMQ——RocketMQ搭建及问题解决】RocketMQ——RocketMQ搭建及问题解决
文章图片

RocketMQ——RocketMQ搭建及问题解决
文章图片

RocketMQ——RocketMQ搭建及问题解决
文章图片

    推荐阅读