SpringBoot整合MQTT并实现异步线程调用的问题

目录

  • 为什么选择MQTT
  • 使用背景
  • 代码实现
    • 基础代码
    • 异步线程处理实现

为什么选择MQTT MQTT的定义相信很多人都能讲的头头是道,本文章也不讨论什么高大上的东西,旨在用最简单直观的方式让每一位刚接触的同行们可以最快的应用起来
先从使用MQTT需要什么开始分析:
  • 消息服务器
  • 不同应用/设备之间的频繁交互
  • 可能涉及一对多的消息传递
基于SpringBoot通过注解实现对mqtt消息处理的异步调用

使用背景
生产环境下, 由于mqtt 生产者生产的消息逐渐增多, 可能会导致消息堆积. 因此需要消费者去快速的消费.
而其中的一个方案便是使用异步线程去加速消费消息. 下面介绍下思路
我们可以在原来的mqtt工具类上面进行改装.
首先创建一个类MqttMessageListener并继承IMqttMessageListener实现messageArrived, 用于处理这些消息(业务编写)
然后改写mqtt客户端订阅的方法, 注入MqttMessageListener, 并在订阅方法中新增该参数
在然后在启动类开启异步线程, 编写一个配置类配置线程池参数并且在messageArrived加上@Async开启异步线程调用

代码实现
基础代码
指没有开启线程池的代码
MqttPushClient 主要定义了连接参数
import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; /** * @Author * @Date * @Description连接至EMQ X 服务器,获取mqtt连接,发布消息 */@Componentpublic class MqttPushClient{private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class); @Autowiredprivate PushCallback pushCallback; private static MqttClient client; public static void setClient(MqttClient client) {MqttPushClient.client = client; }public static MqttClient getClient() {return client; }public void connect(String host, String clientID, String username, String password, int timeout, int keepalive, List topicList) {MqttClient client; try {client = new MqttClient(host, clientID, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); if (username != null) {options.setUserName(username); }if (password != null) {options.setPassword(password.toCharArray()); }options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); MqttPushClient.setClient(client); try {//设置回调类client.setCallback(pushCallback); //client.connect(options); IMqttToken iMqttToken = client.connectWithResult(options); boolean complete = iMqttToken.isComplete(); log.info("MQTT连接"+(complete?"成功":"失败")); /** 订阅主题 **/for (String topic : topicList) {log.info("连接订阅主题:{}", topic); client.subscribe(topic, 0); }} catch (Exception e) {e.printStackTrace(); }} catch (Exception e) {e.printStackTrace(); }}}

PushCallback 回调类, 实现重连, 消息发送监听, 消息接收监听
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @Author * @Date * @Description消息回调,处理接收的消息 */@Componentpublic class PushCallback implements MqttCallback {private static final Logger log = LoggerFactory.getLogger(PushCallback.class); @Autowiredprivate MqttConfiguration mqttConfiguration; @Autowiredprivate MqttTopic mqttTopic; @Overridepublic void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连log.info("连接断开,正在重连"); MqttPushClient mqttPushClient = mqttConfiguration.getMqttPushClient(); if (null != mqttPushClient) {mqttPushClient.connect(mqttConfiguration.getHost(), mqttConfiguration.getClientid(), mqttConfiguration.getUsername(),mqttConfiguration.getPassword(), mqttConfiguration.getTimeout(), mqttConfiguration.getKeepalive(), mqttConfiguration.getTopic()); log.info("已重连"); }}/*** 发送消息,消息到达后处理方法* @param token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {int messageId = token.getMessageId(); String[] topics = token.getTopics(); log.info("消息发送完成,messageId={},topics={}",messageId,topics.toString()); }/*** 订阅主题接收到消息处理方法* @param topic* @param message*/@Overridepublic void messageArrived(String topic, MqttMessage message) {// subscribe后得到的消息会执行到这里面,这里在控制台有输出String messageStr = new String(message.getPayload()); // messageDistribute.distribute(topic, messageStr); log.info("接收的主题:" + topic +"; 接收到的信息:" + messageStr); }}

MqttConfiguration 配置了mqtt相关参数, 并初始化连接(mqtt在这里启动)
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import java.util.List; /** * @Author * @Date mqtt配置及连接 * @Description */@Slf4j@Component@Configuration@ConfigurationProperties(MqttConfiguration.PREFIX)public class MqttConfiguration {@Autowiredprivate MqttPushClient mqttPushClient; /*** 指定配置文件application-local.properties中的属性名前缀*/public static final String PREFIX = "std.mqtt"; private String host; private String clientId; private String userName; private String password; private int timeout; private int keepAlive; private List topic; public String getClientid() {return clientId; }public void setClientid(String clientid) {this.clientId = clientid; }public String getUsername() {return userName; }public void setUsername(String username) {this.userName = username; }public String getPassword() {return password; }public void setPassword(String password) {this.password = password; }public int getTimeout() {return timeout; }public void setTimeout(int timeout) {this.timeout = timeout; }public int getKeepalive() {return keepAlive; }public void setKeepalive(int keepalive) {this.keepAlive = keepalive; }public String getHost() {return host; }public void setHost(String host) {this.host = host; }public List getTopic() {return topic; }public void setTopic(List topic) {this.topic = topic; }/*** 连接至mqtt服务器,获取mqtt连接* @return*/@Beanpublic MqttPushClient getMqttPushClient() {//连接至mqtt服务器,获取mqtt连接mqttPushClient.connect(host, clientId, userName, password, timeout, keepAlive, topic); return mqttPushClient; }}properties.yml 配置文件std.mqtt:host: tcp://x.x.x.x:1883username: your_usernamepassword: your_password#MQTT-连接服务器默认客户端IDclientid: your_clientid#连接超时timeout: 1000# deviceIddeviceId: your_deviceId# mqtt-topictopic[0]: your_tpoic

TopicOperation 定义了发布订阅的方法
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; /** * @Author chy */public class TopicOperation {private static final Logger log = LoggerFactory.getLogger(TopicOperation.class); /*** 订阅主题* @param topic 主题名称*/public static void subscribe(String topic) {try {MqttClient client = MqttPushClient.getClient(); if (client == null) {return; }; client.subscribe(topic, 0); log.info("订阅主题:{}",topic); } catch (MqttException e) {e.printStackTrace(); }}/*** 发布主题** @param topic* @param pushMessage*/public static void publish(String topic, String pushMessage) {log.info("SEND TO MQTT -- topic : {}, message : {}", topic, pushMessage); MqttMessage message = new MqttMessage(); message.setQos(0); // 非持久化message.setRetained(false); message.setPayload(pushMessage.getBytes()); MqttClient client = MqttPushClient.getClient(); if (client == null) {return; }; MqttTopic mTopic = client.getTopic(topic); if (null == mTopic) {log.error("主题不存在:{}",mTopic); }try {mTopic.publish(message); } catch (Exception e) {log.error("mqtt发送消息异常:",e); }}}

定义了发布和订阅的相关主题
import com.sxd.onlinereservation.exception.BusinessException; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * @Author * @Date topic名称 * @Description */@Componentpublic class MqttTopic {@Value("${std.mqtt.deviceId}")private String[] deviceId; public String getSubscribeTopic(String type){switch (type){case "appointTopic":return String.format("/v1/%s/service/appointTopic", deviceId[0]); default:throw new BusinessException("mqtt 订阅主题获取错误"); }}public String getPublishTopic(String type) {switch (type){//1.0接口立即取号发布主题case "appointTopic":return String.format("/v1/%s/service/appointTopic", deviceId[1]); default:throw new BusinessException("mqtt 发布主题获取错误"); }}}

ps: 如果想要使用该工具类进行消息发送和接收看下面demo
//消息发布操作 TopicOperation.publish(mqttTopic.getPublishTopic("appointTopic"), "消息体")); //消息订阅操作TopicOperation.subscribe(mqttTopic.getSubscribeTopic("appointTopic"), "消息体"));


异步线程处理实现
总结
  • 创建消息监听类 , 用于监听消息并进行业务处理
  • 在原来订阅时, 注入并使用第一步创建的监听类
  • 通过注解开启异步线程并配置处理方式
创建消息监听类 , 用于监听消息并进行业务处理
@Slf4j@Componentpublic class MqttMessageListener implements IMqttMessageListener {@Resourceprivate BusinessService businessService; @Autowiredprivate MqttTopic mqttTopic; @Autowiredprivate ThreeCallmachineService threeCallmachineService; @Autowiredprivate BusinessHallService businessHallService; @Autowiredprivate BusinessMaterialService businessMaterialService; @Autowiredprivate BusinessWaitService businessWaitService; @Autowiredprivate AppointmentService appointmentService; @Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {String messageStr = new String(message.getPayload()); log.info("接收的主题:" + topic +"; 接收到的信息:" + messageStr); //进行 业务处理}}

在原来订阅时, 注入并使用第一步创建的监听类
注入了 MqttMessageListener , 并且在订阅时加入 client.subscribe(topic, mqttMessageListener);
修改MqttPushClient (必须)
@Componentpublic class MqttPushClient{private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class); @Autowiredprivate PushCallback pushCallback; @Autowired//这里进行了注入操作private MqttMessageListener mqttMessageListener; private static MqttClient client; public static void setClient(MqttClient client) {MqttPushClient.client = client; }public static MqttClient getClient() {return client; }public void connect(String host, String clientID, String username, String password, int timeout, int keepalive, List topicList) {MqttClient client; try {client = new MqttClient(host, clientID, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); if (username != null) {options.setUserName(username); }if (password != null) {options.setPassword(password.toCharArray()); }options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); MqttPushClient.setClient(client); try {//设置回调类client.setCallback(pushCallback); //client.connect(options); IMqttToken iMqttToken = client.connectWithResult(options); boolean complete = iMqttToken.isComplete(); log.info("MQTT连接"+(complete?"成功":"失败")); /** 订阅主题 **/for (String topic : topicList) {log.info("连接订阅主题:{}", topic); //client.subscribe(topic, 0); client.subscribe(topic, mqttMessageListener); }} catch (Exception e) {e.printStackTrace(); }} catch (Exception e) {e.printStackTrace(); }}}

如果业务还使用了手动订阅, 则也需要在订阅的类上面注入MqttMessageListener , 并且在订阅方法中作为参数使用. 但是我们需要将方法改成非静态的, 因此在使用该方法时我们需要new该对象然后才能够调用. 但是手动订阅很少用到. 因此有无此步骤都可
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; /** * @Author chy * @Date * @Description */public class TopicOperation {private static final Logger log = LoggerFactory.getLogger(TopicOperation.class); //注入MqttMessageListener@Autowiredprivate MqttMessageListener mqttMessageListener; /*** 订阅主题* @param topic 主题名称*/public void subscribe(String topic) {try {MqttClient client = MqttPushClient.getClient(); if (client == null) {return; }; //client.subscribe(topic, 0); //在订阅方法中作为参数使用client.subscribe(topic, mqttMessageListener); log.info("订阅主题:{}",topic); } catch (MqttException e) {e.printStackTrace(); }}/*** 发布主题** @param topic* @param pushMessage*/public static void publish(String topic, String pushMessage) {log.info("SEND TO MQTT -- topic : {}, message : {}", topic, pushMessage); MqttMessage message = new MqttMessage(); message.setQos(0); // 非持久化message.setRetained(false); message.setPayload(pushMessage.getBytes()); MqttClient client = MqttPushClient.getClient(); if (client == null) {return; }; MqttTopic mTopic = client.getTopic(topic); if (null == mTopic) {log.error("主题不存在:{}",mTopic); }try {mTopic.publish(message); } catch (Exception e) {log.error("mqtt发送消息异常:",e); }}}

通过注解开启异步线程并配置处理方式 启动类开启 @EnableAsync(proxyTargetClass=true )
@SpringBootApplication@MapperScan(basePackages = "com.x.x.mapper")@EnableTransactionManagement@EnableAsync(proxyTargetClass=true )public class XXApplication {public static void main(String[] args) {SpringApplication.run(XXApplication.class, args); }}

配置类配置线程池参数
@Slf4j@Configurationpublic class ExecutorConfig {@Beanpublic Executor asyncServiceExecutor() {log.info("start asyncServiceExecutor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数executor.setCorePoolSize(9); //配置最大线程数executor.setMaxPoolSize(20); //配置队列大小executor.setQueueCapacity(200); //配置线程池中的线程的名称前缀executor.setThreadNamePrefix("sxd-async-service-"); // 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化executor.initialize(); return executor; }}

MqttMessageListener的实现方法messageArrived开启@Async("asyncServiceExecutor")
@Slf4j@Componentpublic class MqttMessageListener implements IMqttMessageListener {@Resourceprivate BusinessService businessService; @Autowiredprivate MqttTopic mqttTopic; @Autowiredprivate ThreeCallmachineService threeCallmachineService; @Autowiredprivate BusinessHallService businessHallService; @Autowiredprivate BusinessMaterialService businessMaterialService; @Autowiredprivate BusinessWaitService businessWaitService; @Autowiredprivate AppointmentService appointmentService; @Override@Async("asyncServiceExecutor")public void messageArrived(String topic, MqttMessage message) throws Exception {String messageStr = new String(message.getPayload()); log.info("接收的主题:" + topic +"; 接收到的信息:" + messageStr); System.out.println("线程名称:【" + Thread.currentThread().getName() + "】"); //进行 业务处理}}

【SpringBoot整合MQTT并实现异步线程调用的问题】到此这篇关于SpringBoot整合MQTT并实现异步线程调用的文章就介绍到这了,更多相关SpringBoot异步线程调用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    推荐阅读