RocketMq使用教程

简介:
基于windows 下的使用教程 一、启动服务端 1-下载安装rocket https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.0/rocketmq-all-4.9.0-bin-release.zip
2-配置环境变量:

ROCKETMQ_HOME =(解压位置) D:\rocket\rocketmq-all-4.9.0-bin-release

3-启动 mqnamesrv 服务
进入 D:\rocket\rocketmq-all-4.9.0-bin-release\bin 下 执行 mqnamesrv.cmd 启动服务 或黑窗口执行 .\mqnamesrv 命令

正常启动显示:
RocketMq使用教程
文章图片

异常处理:
1)如果一闪而过 重启电脑试试
2)启动后如果报错“无法加载类 xxx”的话
a)把java_home 的空格去掉 比如 program Files 或者直接改成没有空格的路径下 b)runbroker.cmd 和 runserver.cmd 文件下 classpath添加英文双引号

RocketMq使用教程
文章图片

4-启动 broker 服务 进入安装目录bin下执行以下命令 start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=ture
或执行 .\mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true
正常显示:
RocketMq使用教程
文章图片

5-启动rocketmq后台(可选)
```java 1)下载console代码 链接: https://pan.baidu.com/s/1ee80ppXmJ_IY8R7FefThjA 提取码: zhfy 2)修改D:\rocket\rocketmq-externals-master\rocketmq-console\src\main\resources\application.properties 文件

RocketMq使用教程
文章图片

3)启动项目a-重新打包 在\rocketmq-console项目下mvn clean package -Dmaven.test.skip=true b-启动项目 在\rocketmq-console\target目录下 执行java -jar rocketmq-console-ng-1.0.0.jar 4)访问后台 localhost:serverPort http://localhost:8090/

二、编写服务端代码 项目代码:
https://github.com/lixiao04/RocketMq.git
1-集成springmvc
1-pom引入 org.apache.rocketmq rocketmq-client 4.3.0 2-生产者注入 package com.example.demo.rocketmq; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.UnsupportedEncodingException; @Component @Slf4j public class Producer { @Value(value = "https://www.it610.com/article/${rocketmq.namesrv}") privateString nameservAddr; @Value(value = "https://www.it610.com/article/${rocketmq.group}") privateString group; @Value(value = "https://www.it610.com/article/${rocketmq.topic}") privateString topic; @Value(value = "https://www.it610.com/article/${rocketmq.tag}") privateString tag; privateDefaultMQProducer producer = null; @PostConstruct publicvoid Producer() throws MQClientException { producer = new DefaultMQProducer(group); producer.setNamesrvAddr(nameservAddr); producer.setInstanceName("producer"); producer.setRetryTimesWhenSendFailed(3); producer.start(); log.info("生产者启动成功!"); }public DefaultMQProducer getProducer() { return producer; } }3-消费者注入 package com.example.demo.rocketmq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.List; @Component @Slf4j public class Consumer { @Value(value = "https://www.it610.com/article/${rocketmq.namesrv}") privateString nameservAddr; @Value(value = "https://www.it610.com/article/${rocketmq.group}") privateString group; @Value(value = "https://www.it610.com/article/${rocketmq.topic}") privateString topic; @Value(value = "https://www.it610.com/article/${rocketmq.tag}") privateString tag; DefaultMQPushConsumer consumer=null; @PostConstruct publicvoid consumer() throws MQClientException { consumer = new DefaultMQPushConsumer(group); consumer.setNamesrvAddr(nameservAddr); consumer.setConsumerGroup(group); consumer.setInstanceName("consumer"); consumer.subscribe(topic, tag); //第一次是从对头开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //消费者一次最多可消费多少条 consumer.setConsumeMessageBatchMaxSize(100); //集群模式消费失败默认重试16次,延迟等级为3~18。(messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h") consumer.setMessageModel(MessageModel.CLUSTERING); //最多重试次数 consumer.setMaxReconsumeTimes(6); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { //获取重试次数 int reconsumeTimes = msg.getReconsumeTimes(); //获取topic String topic1 = msg.getTopic(); //获取tag String tags = msg.getTags(); System.out.println("消息id:" + msg.getMsgId() + "---" + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); log.info("消费者启动成功"); } }

4-代码中调用
RocketMq使用教程
文章图片

【RocketMq使用教程】注意rocketmq服务端的版本和客户端的版本不一致也会无法创建topic
也就是当初你下载的客户端是哪个版本的,需要在服务端引入相应版本的jar包,否则设置了 autoCreateTopicEnable=true 也会无法自动创建新的topic
RocketMq使用教程
文章图片

    推荐阅读