SpringBoot 整合 RabbitMQ 实现消息可靠传输

消息的可靠传输是面试必问的问题之一,保证消息的可靠传输主要在生产端开启 comfirm 模式,RabbitMQ 开启持久化,消费端关闭自动 ack 模式。
环境配置 SpringBoot 整合 RabbitMQ 实现消息的发送。
  • 添加 maven 依赖
org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-amqp

  • 添加 application.yml 配置文件
spring: rabbitmq: host: 192.168.3.19 port: 5672 username: admin password: xxxx

  • 配置交换机、队列以及绑定
@Bean public DirectExchange myExchange() { DirectExchange directExchange = new DirectExchange("myExchange"); return directExchange; }@Bean public Queue myQueue() { Queue queue = new Queue("myQueue"); return queue; }@Bean public Binding binding() { return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey"); }

  • 生产发送消息
@Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send") public String send(String message) { rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message); System.out.println("【发送消息】" + message) return "【send message】" + message; }

  • 消费者接收消息
@RabbitListener(queuesToDeclare = @Queue("myQueue")) public void process(String msg, Channel channel, Message message) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(); String time = sdf.format(date); System.out.println("【接收信息】" + msg + " 当前时间" + time);

  • 调用生产端发送消息 hello,控制台输出:
【发送消息】hello 【接收信息】hello 当前时间2022-05-12 10:21:14

说明消息已经被成功接收。
消息丢失分析 SpringBoot 整合 RabbitMQ 实现消息可靠传输
文章图片

一条消息的从生产到消费,消息丢失可能发生在以下几个阶段:
  • 生产端丢失: 生产者无法传输到 RabbitMQ
  • 存储端丢失: RabbitMQ 存储自身挂了
  • 消费端丢失:存储由于网络问题,无法发送到消费端,或者消费挂了,无法发送正常消费
RabbitMQ 从生产端、储存端、消费端都对可靠性传输做很好的支持。
生产阶段 生产阶段通过请求确认机制,来确保消息的可靠传输。当发送消息到 RabbitMQ 服务器 之后,RabbitMQ 收到消息之后,给发送返回一个请求确认,表示RabbitMQ 服务器已成功的接收到了消息。
  • 配置 application.yml
spring: rabbitmq: # 消息确认机制 生产者 -> 交换机 publisher-confirms: true # 消息返回机制交换机 -> 队列 publisher-returns: true

配置
@Configuration @Slf4j public class RabbitConfig {@Autowired private ConnectionFactory connectionFactory; @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("【correlationData】:" + correlationData); log.info("【ack】" + ack); log.info("【cause】" + cause); if (ack) { log.info("【发送成功】"); } else { log.info("【发送失败】correlationData:" + correlationData + " cause:" + cause); } } }); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.warn("【消息发送失败】"); log.info("【message】" + message); log.info("【replyCode】" + replyCode); } }); return rabbitTemplate; } }

消息从 生产者 到 交换机, 有confirmCallback 确认模式。发送消息成功后消息会调用方法 confirm(CorrelationData correlationData, boolean ack, String cause) ,根据 ack 判断消息是否发送成功。
消息从 交换机 到 队列,有returnCallback 退回模式。
发送消息 product message 控制台输出如下:
【发送消息】product message 【接收信息】product message 当前时间2022-05-12 11:27:56 【correlationData】:null 【ack】true 【cause】null 【发送成功】

生产端模拟消息丢失
这里有两个方案:
  1. 发送消息后立马关闭 broke,后者把网络关闭,但是broker关闭之后控制台一直就会报错,发送消息也报500错误。
  2. 发送不存在的交换机:
// myExchange 修改成 myExchangexxxxx rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);

结果:
【correlationData】:null 【ack】false 【cause】channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/', class-id=60, method-id=40) 【发送失败】

当发送失败可以对消息进行重试
  1. 交换机正确,发送不存在的队列:
交换机接收到消息,返回成功通知,控制台输出:
【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c] 【ack】true 【cause】null 【发送成功】

交换机没有找到队列,返回失败信息:
【消息发送失败】 【message】product message 【replyCode】312

RabbitMQ 开启队列持久化,创建的队列和交换机默认配置是持久化的。首先把队列和交换机设置正确,修改消费监听的队列,使得消息存放在队列里。
修改队列的持久化,修改成非持久化:
@Bean public Queue myQueue() { Queue queue = new Queue("myQueue",false); return queue; }

发送消息之后,消息存放在队列中,然后重启 RabbitMQ,消息不存在了。
设置队列持久化:
@Bean public Queue myQueue() { Queue queue = new Queue("myQueue",true); return queue; }

重启之后,队列的消息还存在。
消费端 消费端默认开始 ack 自动确认模式,当队列消息被消费者接收,不管有没有被消费端消息,都自动删除队列中的消息。所以为了确保消费端能成功消费消息,将自动模式改成手动确认模式:
修改 application.yml 文件
spring: rabbitmq: # 手动消息确认 listener: simple: acknowledge-mode: manual

消费接收消息之后需要手动确认:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

@RabbitListener(queuesToDeclare = @Queue("myQueue")) public void process(String msg, Channel channel, Message message) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(); String time = sdf.format(date); System.out.println("【接收信息】" + msg + " 当前时间" + time); System.out.println(message.getMessageProperties().getDeliveryTag()); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); }}

如果不添加:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

发送两条消息
消息被接收后,没有确认,重新放到队列中:
SpringBoot 整合 RabbitMQ 实现消息可靠传输
文章图片

重启项目,之后,队列的消息会发送到消费者,但是没有 ack 确认,还是继续会放回队列中。
【SpringBoot 整合 RabbitMQ 实现消息可靠传输】加上 channel.basicAck 之后,再重启项目:
SpringBoot 整合 RabbitMQ 实现消息可靠传输
文章图片

队列消息就被删除了
basicAck 方法最后一个参数 multiple 表示是删除之前的队列。
multiple 设置成 true,把后面的队列都清理掉了:
SpringBoot 整合 RabbitMQ 实现消息可靠传输
文章图片

源码 https://github.com/jeremylai7/springboot-learning/tree/master/spring-rabbitmq
如果觉得文章对你有帮助的话,请点个赞吧!

    推荐阅读