RabbitMQ|RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合

目录

  • 1. 概述
  • 2. 场景说明
  • 3. 与Springboot的整合
    • 3.1 引入依赖
    • 3.2 生产服务配置
    • 3.3 生产服务代码
    • 3.4 消费服务配置
    • 3.5 消费服务代码
    • 3.6 Rest 测试代码
  • 4. 综述

    1. 概述 老话说的好:做人要懂得变通,善于思考,有时稍微转个弯,也许问题就解决了。
    言归正传,之前我们聊了 RabbitMQ 3.9.7 镜像模式集群的搭建,今天我们来聊聊 RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合。
    【RabbitMQ|RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合】
    2. 场景说明 服务器A IP:192.168.1.22
    服务器B IP:192.168.1.8
    服务器C IP:192.168.1.144
    此三台服务器上已搭建好了 RabbitMQ镜像模式集群,镜像模式集群的搭建,可参见我的上一篇文章。

    3. 与Springboot的整合
    3.1 引入依赖
    org.springframework.bootspring-boot-starter-parent2.5.5

    org.springframework.bootspring-boot-starter-amqp


    3.2 生产服务配置
    spring:rabbitmq:addresses: 192.168.1.22:5672,192.168.1.8:5672,192.168.1.144:5672username: guestpassword: guestvirtual-host: /connection-timeout: 16000# 启用消息确认模式publisher-confirm-type: correlated# 启用 return 消息模式publisher-returns: truetemplate:mandatory: true


    3.3 生产服务代码
    import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import java.util.Map; @Componentpublic class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate; /*** 确认回调*/final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// correlationData 唯一标识// ack mq是否收到消息// cause 失败原因System.out.println("correlationData:" + correlationData.getId()); System.out.println("ack:" + ack); System.out.println("cause:" + cause); }}; /*** 发送消息* @param messageBody消息体* @param headers附加属性* @throws Exception*/public void sendMessage(String messageBody, Map headers, String id) throws Exception {MessageHeaders messageHeaders = new MessageHeaders(headers); Message message = MessageBuilder.createMessage(messageBody, messageHeaders); rabbitTemplate.setConfirmCallback(confirmCallback); String exchangeName = "exchange-hello"; String routingKey = "test.123"; CorrelationData correlationData = https://www.it610.com/article/new CorrelationData(id); rabbitTemplate.convertAndSend(exchangeName, routingKey, message, new MessagePostProcessor() {/*** 发送消息后做的事情* @param message* @return* @throws AmqpException*/@Overridepublic org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {return message; }}, correlationData); }}


    3.4 消费服务配置
    spring:rabbitmq:addresses: 192.168.1.22:5672,192.168.1.8:5672,192.168.1.144:5672username: guestpassword: guestvirtual-host: /connection-timeout: 16000listener:simple:# 设置为手工ACKacknowledge-mode: manualconcurrency: 5prefetch: 1max-concurrency: 10


    3.5 消费服务代码
    import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; @Componentpublic class Consumer {@RabbitListener(bindings = @QueueBinding(value = https://www.it610.com/article/@Queue(value ="queue-hello", durable = "true"),exchange = @Exchange(value = "https://www.it610.com/article/exchange-hello" , durable = "true", type = "topic"),key = "test.*"))@RabbitHandlerpublic void onMessage(Message message, Channel channel) throws Exception {System.out.println("收到消息:" + message.getPayload()); Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); }}


    3.6 Rest 测试代码
    @RestController@RequestMapping("/mq")public class RabbitmqController {@Autowiredprivate Producer producer; @GetMapping("/sendMessage")public String sendMessage(@RequestParam String messageBody, @RequestParam String id) throws Exception {Map headers = new HashMap<>(); producer.sendMessage(messageBody, headers, id); return "success"; }}


    4. 综述 到此这篇关于RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合的文章就介绍到这了,更多相关RabbitMQ镜像模式集群内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

      推荐阅读