简介:
基于windows 下的使用教程
一、启动服务端
1-下载安装rocket https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.0/rocketmq-all-4.9.0-bin-release.zip
2-配置环境变量:
ROCKETMQ_HOME =(解压位置) D:\rocket\rocketmq-all-4.9.0-bin-release
3-启动 mqnamesrv 服务
进入 D:\rocket\rocketmq-all-4.9.0-bin-release\bin 下 执行 mqnamesrv.cmd 启动服务
或黑窗口执行 .\mqnamesrv 命令
正常启动显示:
文章图片
异常处理:
1)如果一闪而过 重启电脑试试
2)启动后如果报错“无法加载类 xxx”的话
a)把java_home 的空格去掉 比如 program Files 或者直接改成没有空格的路径下
b)runbroker.cmd 和 runserver.cmd 文件下 classpath添加英文双引号
文章图片
4-启动 broker 服务 进入安装目录bin下执行以下命令 start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=ture
或执行 .\mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true
正常显示:
文章图片
5-启动rocketmq后台(可选)
```java
1)下载console代码
链接: https://pan.baidu.com/s/1ee80ppXmJ_IY8R7FefThjA 提取码: zhfy
2)修改D:\rocket\rocketmq-externals-master\rocketmq-console\src\main\resources\application.properties 文件
文章图片
3)启动项目a-重新打包
在\rocketmq-console项目下mvn clean package -Dmaven.test.skip=true
b-启动项目
在\rocketmq-console\target目录下 执行java -jar rocketmq-console-ng-1.0.0.jar
4)访问后台
localhost:serverPort
http://localhost:8090/
二、编写服务端代码 项目代码:
https://github.com/lixiao04/RocketMq.git
1-集成springmvc
1-pom引入
org.apache.rocketmq
rocketmq-client
4.3.0
2-生产者注入
package com.example.demo.rocketmq;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
@Component
@Slf4j
public class Producer {
@Value(value = "https://www.it610.com/article/${rocketmq.namesrv}")
privateString nameservAddr;
@Value(value = "https://www.it610.com/article/${rocketmq.group}")
privateString group;
@Value(value = "https://www.it610.com/article/${rocketmq.topic}")
privateString topic;
@Value(value = "https://www.it610.com/article/${rocketmq.tag}")
privateString tag;
privateDefaultMQProducer producer = null;
@PostConstruct
publicvoid Producer() throws MQClientException {
producer = new DefaultMQProducer(group);
producer.setNamesrvAddr(nameservAddr);
producer.setInstanceName("producer");
producer.setRetryTimesWhenSendFailed(3);
producer.start();
log.info("生产者启动成功!");
}public DefaultMQProducer getProducer() {
return producer;
}
}3-消费者注入
package com.example.demo.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
@Component
@Slf4j
public class Consumer {
@Value(value = "https://www.it610.com/article/${rocketmq.namesrv}")
privateString nameservAddr;
@Value(value = "https://www.it610.com/article/${rocketmq.group}")
privateString group;
@Value(value = "https://www.it610.com/article/${rocketmq.topic}")
privateString topic;
@Value(value = "https://www.it610.com/article/${rocketmq.tag}")
privateString tag;
DefaultMQPushConsumer consumer=null;
@PostConstruct
publicvoid consumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(group);
consumer.setNamesrvAddr(nameservAddr);
consumer.setConsumerGroup(group);
consumer.setInstanceName("consumer");
consumer.subscribe(topic, tag);
//第一次是从对头开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//消费者一次最多可消费多少条
consumer.setConsumeMessageBatchMaxSize(100);
//集群模式消费失败默认重试16次,延迟等级为3~18。(messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h")
consumer.setMessageModel(MessageModel.CLUSTERING);
//最多重试次数
consumer.setMaxReconsumeTimes(6);
consumer.registerMessageListener(new MessageListenerConcurrently()
{
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
//获取重试次数
int reconsumeTimes = msg.getReconsumeTimes();
//获取topic
String topic1 = msg.getTopic();
//获取tag
String tags = msg.getTags();
System.out.println("消息id:" + msg.getMsgId() + "---" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
log.info("消费者启动成功");
}
}
4-代码中调用
文章图片
【RocketMq使用教程】注意rocketmq服务端的版本和客户端的版本不一致也会无法创建topic
也就是当初你下载的客户端是哪个版本的,需要在服务端引入相应版本的jar包,否则设置了 autoCreateTopicEnable=true 也会无法自动创建新的topic
文章图片
推荐阅读
- RocketMQ高可用设计之消息重试机制
- 详解文件IO系列讲讲 MQ 消息中间件 (Kafka,RocketMQ等)与MMAPPageCache 的故事...
- 关于 RocketMQClientID 相同引发的消息堆积的问题
- Alibaba中间件技术「RocketMQ专题」探索DefaultMQPushConsumer
- RocketMQ(消息存储机制详解与源码解析)
- RocketMQ报错
- 10分钟认识RocketMQ!想进阿里连这个都不会()
- 一次 RocketMQ 顺序消费延迟的问题定位
- RocketMQ基础概念剖析&源码解析