微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)


微服务23_服务异步通信01:消息可靠性

  • 前言:消息队列在使用过程中,面临着很多实际问题需要思考:
  • 本文章是: 消息的可靠性
  • 一、生产者消息确认【确保生产者发送成功】
    • 1.生产者消息确认【确保消息投递到MQ当中】
    • 引入demo以及开启rabbitMQ
      • 1.修改配置
      • 2.定义ReturnCallback回调.【消息到达了交换机但是路由到队列的过程中失败了】全局唯一
      • 3.定义ConfirmCallback.【消息没有到到达交换机】
    • 测试:
        • 1. 测试消息发送成功:
        • 2. 测试:在交换机发送失败:
        • 3.测试:发送到了交换机,但是发送队列时,发送失败了。
  • 二、消息持久化(确保消息在MQ中保存)
      • 1. 交换机如何设定持久化:
      • 2.队列如何设定持久化:
      • 3.消息如何设定持久化
  • 三、消费者消息确认【确保消费者消费消息】
      • 演示:auto模式:
  • 四、消费失败重置机制【确保消费者消费消息】
      • 1.本地重试
      • 2.失败策略
      • RepublishMessageRecoverer失败后重新投递到指定交换机中。
      • 总结

利用springAMQP进行收发消息是最基本的功能,因为在收发消息过程当中,会遇到很多问题需要解决。这一章就是来学习RabbitMQ的高级特性
前言:消息队列在使用过程中,面临着很多实际问题需要思考:
  1. 消息可靠性问题:确保发送的消息至少被消费一次。当发送消息后,MQ一定保证投递到消费者,而且被消费掉。
  2. 延迟消息的问题:实现消息的延迟投递。业务需求,例如BOSS预约腾讯会议,半个小时后通知所有的人。
  3. 消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题。。例如在高并发场景下,消息的发送越来越多,消费者忙不过来。那么MQ能否保存数百万的消息。
  4. 高可用问题:应避免单点的MQ故障,避免不可以问题。单机模式,一旦出现故障,整个服务就不用了。搭建集群、集群同步、集群通信。
微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

第一节:消息可靠性问题
第二节:延迟消息问题
第三节:消息堆积问题
第四节:高可用问题:普通、镜像、仲裁集群
本文章是: 消息的可靠性 如何确保RabbitMQ消息的可靠性?
三个角度来讲:
  1. 生产者消息弄丢
  2. MQ消息弄丢
  3. 消费者把消息弄丢
  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack。重试次数耗尽后,会抛弃消息。
  • 开启消费者失败重试机制,并设置MessageRecoverer,重试次数耗尽后,后将消息投递到异常交换机,交由人工处理
消息从发送到接收,都有哪些流程呢?
在RabbitMQ当中,。
第一步:发送者将消息投递给交换机【exchange】,
第二步:交换机会根据路由K,路由到队列,
第三步:队列再把消息投递给消费者。
消息在以上三步都可能发生丢失:
  1. 在发送的过程中丢失数据:
    • 生产者发送的消息未到达交换机
    • 消息到达交换机后未到达队列
有网络传输,就有可能丢失数据
只有将消息保存到队列,才能叫消息发送成功。
  1. MQ宕机,队列消息丢失
如果此时MQ发生宕机,而MQ又是内存存储,宕机后所有数据会全部丢失,从而消息肯定也会丢失。所以说MQ也有可能把消息丢失
  1. 消费者接收到了消息,未消费就宕机
消费者也可能发生宕机,消费者接收了消息,还没来及消费消息,就发生了宕机,那么消息从而也就会丢失了。
微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

一、生产者消息确认【确保生产者发送成功】 1.生产者消息确认【确保消息投递到MQ当中】 RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
返回结果有两种方式:
  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。
微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

引入demo以及开启rabbitMQ 微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

在Linux中通过docker,启动rabbitMQ:
[root@localhost ~]# docker run-e RABBITMQ_DEFAULT_USER=itcast-e RABBITMQ_DEFAULT_PASS=123321--name mq--hostname mq1-p 15672:15672-p 5672:5672-drabbitmq:3-management 08c0271a8255b9383f054c5436859b06e29525b8fadd9895bff286d66cbbdd05 [root@localhost ~]# docker ps CONTAINER IDIMAGECOMMANDCREATEDSTATUSPORTSNAMES 08c0271a8255rabbitmq:3-management"docker-entrypoint.s…"4 seconds agoUp 2 seconds4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcpmq

1.修改配置
首先,修改publisher服务中的application.yml文件,添加下面的内容:
spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true template: mandatory: true

说明:
  • publish-confirm-type:开启publisher-confirm。生产者确认的类型。这里支持两种类型:
    • simple:同步等待confirm结果,直到超时。可能导致代码的阻塞,不推荐。
    • correlated:异步回调,定义ConfirmCallback。发送完消息,不等待,而是当MQ返回结果时会回调这个ConfirmCallback。
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback。那么在发送到交换机、队列的过程中出了问题,有可能会返回结果。如果需要返回结果需要:teplate mandatory : true
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback,从而才能看到路由消息失败的原因;false:则直接丢弃消息
2.定义ReturnCallback回调.【消息到达了交换机但是路由到队列的过程中失败了】全局唯一
上述在yml文件中定义了,回调机制。那么就需要编写回调机制的函数:
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:
RabbitTemplate是由spring来创建的,所以说RabbitTemplate是一个单例Bean。
而该类只能配置一个ReturnCallback,所以咱们不能每次发送消息来配置。那么我们就需要在项目启动时为RabbitTemplate配置一个return callback。从而能达到全局生效的问题。
在spring生命周期中:Aware 是一个通知接口
applicationContext是springBean的容器。 在spring中所有的bean都是放在applicationContext中。
那么:ApplicationContextAware就是Bean容器的通知。也是Bean工厂的通知,意思是:当SpringBean工厂准备好了以后,它会通知实现的该接口的类。实现了该接口,要重写方法:setApplicationContext方法(参数:)。当通知的的时候会把spring容器传递过来。 既然拿到了工厂,就可以在该工程中取出想要的Bean了。 从而可以操作该Bean。
这个该类是在Bean工厂创建完了以创建,从而项目启动时就会创建该类。从而callback是全局的
微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

其中是一个lambad表达式,本质是匿名内部类,其中有5个参数。{大括号里面是业务逻辑}
现在是路由消息失败了,会有一个回值,回值中有:
message:1. 发送的消息是什么
replyCode:2. 失败的状态码
replyCode:3. 失败的原因
exange:4. 投递到了哪一个交换机
routingKey:5. 投递时用的是哪一个routingKey
现在可以利用五个参数可以:
  1. 进行记录日志。
    2.可以通过消息体、交换机、路由进行重发消息。通过交换机、路由key重新发送消息体
修改publisher服务,添加一个:
package cn.itcast.mq.config; import com.rabbitmq.client.ReturnCallback; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { // Bean工厂的通知。当bean工厂准备好了以后,就会通知该类 // 全局的配置:ReturnCallback @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 获取RabbitTemplate对象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 配置returnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 记录日志: 依次填写到占位符中。 log.error("消息发送到队列失败, 响应码{} ,失败原因{},交换机{} ,路由key{},消息:{}",replyCode,replyText,exchange ,routingKey,message); }); } }

3.定义ConfirmCallback.【消息没有到到达交换机】
ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。
在publisher服务的cn.itcast.mq.spring.SpringAmqpTest类中,定义一个单元测试方法:
confiremCalback并没有要求RabbitTemblate只有唯一的一个confiremCalback。每次发送消息可以写不同的confiremCalback,去有不同的业务方案。所以是在发送消息的过程中进行添加。
  1. 准备消息
    correlationData: 【消息的唯一ID、callback】
    ID: 用的是UUID,确保每一个消息都是不唯一的id
    callback就是confirmCallback。:第一:获取一个将来的对象,因为现在只是发送消息,发完以后等待将来的某一刻拿到回调。第二然后在进行添加callback。
    其中:result:成功的回调函数。接收到RabbitMQ的回值就是成功回调。分为两部分:ack / nack
    ex:是失败的回调函数。 什么时候会出现失败回调呢?在发送消息过程中,不知道为什么抛出了异常,导致回调都没有收到
  2. 发送消息。利用rabbitTemplage.转换并且发送(交换机的名字、routingKey的名称、消息体、【消息的唯一ID、callback】)
    correlationData:在配置文件中有一个【publisher-confirm-type: correlated】
package cn.itcast.mq.spring; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.concurrent.FailureCallback; import org.springframework.util.concurrent.SuccessCallback; import java.util.UUID; @Slf4j @RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage2SimpleQueue() throws InterruptedException { //String routingKey = "simple.queue"; // 1. 准备发送消息 String message = "hello, spring amqp!"; // 2. 准备失败回调 CorrelationData correlationData = https://www.it610.com/article/new CorrelationData(UUID.randomUUID().toString()); correlationData.getFuture().addCallback(confirm -> { if (confirm.isAck()){ log.debug("消息成功投递到交换机:消息id{}",correlationData.getId()); }else{ // nack log.error("消息还没有投递到交换机,就失败了:消息id{}",correlationData.getId()); } }, throwable -> { log.error("消息发送失败:",throwable); }); //3. 发送消息:交换机、路由Key、消息、ConfirmCallback发消息那一刻去做回调 //rabbitTemplate.convertAndSend("amq.topic","simple.queue", message,correlationData); // 将交换机的名字写错: //rabbitTemplate.convertAndSend("amq.topic","simple.queue", message,correlationData); // 将队列的名字写错:这样成功投递到交换机,但是队列投递失败: //rabbitTemplate.convertAndSend("amq.topic","aaaa.simple.queue", message,correlationData); } }

测试:
  1. 手动添加队列
    微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
    文章图片
  2. 绑定交换机和队列:
    微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
    文章图片
1. 测试消息发送成功:
  1. 发送消息成功了:
    微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
    文章图片
2. 测试:在交换机发送失败: 微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

3.测试:发送到了交换机,但是发送队列时,发送失败了。 微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

二、消息持久化(确保消息在MQ中保存) 生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。
要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。
  • 交换机持久化
  • 队列持久化
  • 消息持久化
在springAMQP中,交换机、队列、消息默认都是持久的。
-----…
交换机和队列在consumer(消费者)中创建时,默认是持久的
消息在publisher(出版、生产者)中创建时,默认是持久的
默认的为什么还要学习持久化:
因为持久化要写磁盘,并不是所有的业务都要进行持久化。
1. 交换机如何设定持久化:
@Configuration public class CommonConfig {// 设置交换机: @Bean public DirectExchange exchange(){ // 参数: 交换机的名称、是否持久化、当没有与Queue队列绑定时,是否自动删除 return new DirectExchange("simple.direct",true,false); } }

微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

  • 查看原码持久化:微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
    文章图片
2.队列如何设定持久化:
@Bean public Queue simpleQueue(){ // 使用QueueBuilder构建队列,durable就是持久化的 return QueueBuilder.durable("simple.queue").build(); }

微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

  • 查看原码持久化:微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
    文章图片
3.消息如何设定持久化
微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

  • 查看原码持久化:
    微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
    文章图片
三、消费者消息确认【确保消费者消费消息】 RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费后消费者消费后会立即删除
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。
设想这样的场景:
  • 1)RabbitMQ投递消息给消费者
  • 2)消费者获取消息后,返回ACK给RabbitMQ
  • 3)RabbitMQ删除消息
  • 4)消费者宕机,还未来及进行消费消息。从而消息在消费者中丢失了
    这样,消息就丢失了。因此消费者返回ACK的时机非常重要。
由此可知:
  • none模式下:消息投递是不可靠的,可能丢失
  • auto模式类似事务机制,出现异常时返回nack,消息回滚到mq,默认会立马重发消息;没有异常,返回ack。
  • manual:自己根据业务请,判断什么时候该ack。
一般,都是使用默认的auto即可。
演示:auto模式:
  1. 添加依赖:
    因为这是在确保消费者能收到消息,并且能消费消息。所以应该在consumer消费者中添加消息确认机制:
    修改consumer服务的application.yml文件,添加下面内容:
spring: rabbitmq: listener: simple: acknowledge-mode: auto # 关闭ack

  1. 在consumer服务下:进行监听消息,并且接受消息的方法中:模拟一个消息处理异常:
  2. 在队列中添加一个消息:
    微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
    文章图片

    可以看到有一个消息在队列中了:
    微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
    文章图片

那我们假设:接受到了消息,在还没有处理消息时,抛出异常(模拟服务宕机了),由于我们开启了aotu消息确认,那么消息队列会一直保留消息,那么也会一直重发消息:
现在是:消息队列在发送完消息后,在等待消息确认,而其实服务器已经发生了异常。
微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

四、消费失败重置机制【确保消费者消费消息】 默认的失败重试机制:无限制的:MQ的队列将消息发送给消费者,消费者把消息重新投递给MQ的队列【返回NACK】
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

1.本地重试
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。【不返回ACK,也不返回NACK,在本地进行尝试,尝试到成功、或者尝试的上限为止。】 在采取策略:1. 再把消息投递给MQ,人工介入。2.直接抛弃。
修改consumer服务的application.yml文件,添加内容:
spring: rabbitmq: listener: simple: retry: enabled: true # 开启消费者失败重试 initial-interval: 1000 # 初识的失败等待时长为1秒 multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval max-attempts: 3 # 最大重试次数 stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

重启consumer服务,重复之前的测试。可以发现:
  • 在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
  • 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了
微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

结论:
  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回ack,消息会被丢弃
2.失败策略
在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式,队列和消费者快速的互推
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。频率稍微比上面默认的情况低一些,在本地测试了以后,在返回队列。
  • 推荐:RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。(republish重新发布)
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

RepublishMessageRecoverer失败后重新投递到指定交换机中。
1)在consumer服务中定义处理失败消息的交换机和队列
@Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); }

2)定义一个RepublishMessageRecoverer,关联队列和交换机
按着springBoot自动装配的原理,想覆盖spring的默认配置,只需要自己定义一个Bean就能覆盖了。 所以说:自己定义一个Bean:MessageRecoverer (消息回收站)就会覆盖默认的:微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

创建new我们要遵循的策略RepublishMessageRecoverer是重发,
参数一:由于重发需要rabbitTemplate.。
参数二和三: 既然重发,指定交换机和路由routingKey(路由:路由和队列匹配就能发送成功。)
@Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); }

完整代码:
package cn.itcast.mq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.context.annotation.Bean; @Configuration public class ErrorMessageConfig { @Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue , DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); }@Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); } }

微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

不光有错误的消息,并且还有消费者的错误信息带过来了:
微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)
文章图片

总结
如何确保RabbitMQ消息的可靠性?
【微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)】三个角度来讲:
  1. 生产者消息弄丢
  2. MQ消息弄丢
  3. 消费者把消息弄丢
  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack。重试次数耗尽后,会抛弃消息。
  • 开启消费者失败重试机制,并设置MessageRecoverer,重试次数耗尽后,后将消息投递到异常交换机,交由人工处理

    推荐阅读