微服务23_服务异步通信01:消息可靠性
- 前言:消息队列在使用过程中,面临着很多实际问题需要思考:
- 本文章是: 消息的可靠性
- 一、生产者消息确认【确保生产者发送成功】
-
- 1.生产者消息确认【确保消息投递到MQ当中】
- 引入demo以及开启rabbitMQ
-
- 1.修改配置
- 2.定义ReturnCallback回调.【消息到达了交换机但是路由到队列的过程中失败了】全局唯一
- 3.定义ConfirmCallback.【消息没有到到达交换机】
- 测试:
-
-
- 1. 测试消息发送成功:
- 2. 测试:在交换机发送失败:
- 3.测试:发送到了交换机,但是发送队列时,发送失败了。
-
- 二、消息持久化(确保消息在MQ中保存)
-
-
- 1. 交换机如何设定持久化:
- 2.队列如何设定持久化:
- 3.消息如何设定持久化
-
- 三、消费者消息确认【确保消费者消费消息】
-
-
- 演示:auto模式:
-
- 四、消费失败重置机制【确保消费者消费消息】
-
-
- 1.本地重试
- 2.失败策略
- RepublishMessageRecoverer失败后重新投递到指定交换机中。
- 总结
-
利用springAMQP进行收发消息是最基本的功能,因为在收发消息过程当中,会遇到很多问题需要解决。这一章就是来学习RabbitMQ的高级特性前言:消息队列在使用过程中,面临着很多实际问题需要思考:
- 消息可靠性问题:确保发送的消息至少被消费一次。当发送消息后,MQ一定保证投递到消费者,而且被消费掉。
- 延迟消息的问题:实现消息的延迟投递。业务需求,例如BOSS预约腾讯会议,半个小时后通知所有的人。
- 消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题。。例如在高并发场景下,消息的发送越来越多,消费者忙不过来。那么MQ能否保存数百万的消息。
- 高可用问题:应避免单点的MQ故障,避免不可以问题。单机模式,一旦出现故障,整个服务就不用了。搭建集群、集群同步、集群通信。
文章图片
第一节:消息可靠性问题
第二节:延迟消息问题
第三节:消息堆积问题
第四节:高可用问题:普通、镜像、仲裁集群
本文章是: 消息的可靠性 如何确保RabbitMQ消息的可靠性?
三个角度来讲:
- 生产者消息弄丢
- MQ消息弄丢
- 消费者把消息弄丢
- 开启生产者确认机制,确保生产者的消息能到达队列
- 开启持久化功能,确保消息未消费前在队列中不会丢失
- 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack。重试次数耗尽后,会抛弃消息。
- 开启消费者失败重试机制,并设置MessageRecoverer,重试次数耗尽后,后将消息投递到异常交换机,交由人工处理
在RabbitMQ当中,。
第一步:发送者将消息投递给交换机【exchange】,
第二步:交换机会根据路由K,路由到队列,
第三步:队列再把消息投递给消费者。
消息在以上三步都可能发生丢失:
- 在发送的过程中丢失数据:
- 生产者发送的消息未到达交换机
- 消息到达交换机后未到达队列
有网络传输,就有可能丢失数据
只有将消息保存到队列,才能叫消息发送成功。
- MQ宕机,队列消息丢失
如果此时MQ发生宕机,而MQ又是内存存储,宕机后所有数据会全部丢失,从而消息肯定也会丢失。所以说MQ也有可能把消息丢失
- 消费者接收到了消息,未消费就宕机
消费者也可能发生宕机,消费者接收了消息,还没来及消费消息,就发生了宕机,那么消息从而也就会丢失了。
文章图片
一、生产者消息确认【确保生产者发送成功】 1.生产者消息确认【确保消息投递到MQ当中】 RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
返回结果有两种方式:
- publisher-confirm,发送者确认
- 消息成功投递到交换机,返回ack
- 消息未投递到交换机,返回nack
- publisher-return,发送者回执
- 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。
文章图片
引入demo以及开启rabbitMQ
文章图片
在Linux中通过docker,启动rabbitMQ:
[root@localhost ~]# docker run-e RABBITMQ_DEFAULT_USER=itcast-e RABBITMQ_DEFAULT_PASS=123321--name mq--hostname mq1-p 15672:15672-p 5672:5672-drabbitmq:3-management
08c0271a8255b9383f054c5436859b06e29525b8fadd9895bff286d66cbbdd05
[root@localhost ~]# docker ps
CONTAINER IDIMAGECOMMANDCREATEDSTATUSPORTSNAMES
08c0271a8255rabbitmq:3-management"docker-entrypoint.s…"4 seconds agoUp 2 seconds4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcpmq
1.修改配置
首先,修改publisher服务中的application.yml文件,添加下面的内容:
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
说明:
publish-confirm-type
:开启publisher-confirm。生产者确认的类型。这里支持两种类型:simple
:同步等待confirm结果,直到超时。可能导致代码的阻塞,不推荐。correlated
:异步回调,定义ConfirmCallback。发送完消息,不等待,而是当MQ返回结果时会回调这个ConfirmCallback。
publish-returns
:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback。那么在发送到交换机、队列的过程中出了问题,有可能会返回结果。如果需要返回结果需要:teplate mandatory : truetemplate.mandatory
:定义消息路由失败时的策略。true,则调用ReturnCallback,从而才能看到路由消息失败的原因;false:则直接丢弃消息
上述在yml文件中定义了,回调机制。那么就需要编写回调机制的函数:每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:
RabbitTemplate是由spring来创建的,所以说RabbitTemplate是一个单例Bean。
而该类只能配置一个ReturnCallback,所以咱们不能每次发送消息来配置。那么我们就需要在项目启动时为RabbitTemplate配置一个return callback。从而能达到全局生效的问题。
在spring生命周期中:Aware 是一个通知接口
applicationContext是springBean的容器。 在spring中所有的bean都是放在applicationContext中。
那么:ApplicationContextAware就是Bean容器的通知。也是Bean工厂的通知,意思是:当SpringBean工厂准备好了以后,它会通知实现的该接口的类。实现了该接口,要重写方法:setApplicationContext方法(参数:)。当通知的的时候会把spring容器传递过来。 既然拿到了工厂,就可以在该工程中取出想要的Bean了。 从而可以操作该Bean。
这个该类是在Bean工厂创建完了以创建,从而项目启动时就会创建该类。从而callback是全局的
文章图片
其中是一个lambad表达式,本质是匿名内部类,其中有5个参数。{大括号里面是业务逻辑}修改publisher服务,添加一个:
现在是路由消息失败了,会有一个回值,回值中有:
message:1. 发送的消息是什么
replyCode:2. 失败的状态码
replyCode:3. 失败的原因
exange:4. 投递到了哪一个交换机
routingKey:5. 投递时用的是哪一个routingKey
现在可以利用五个参数可以:
- 进行记录日志。
2.可以通过消息体、交换机、路由进行重发消息。通过交换机、路由key重新发送消息体
package cn.itcast.mq.config;
import com.rabbitmq.client.ReturnCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
// Bean工厂的通知。当bean工厂准备好了以后,就会通知该类
// 全局的配置:ReturnCallback
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate对象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置returnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 记录日志: 依次填写到占位符中。
log.error("消息发送到队列失败, 响应码{} ,失败原因{},交换机{} ,路由key{},消息:{}",replyCode,replyText,exchange
,routingKey,message);
});
}
}
3.定义ConfirmCallback.【消息没有到到达交换机】
ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。
在publisher服务的cn.itcast.mq.spring.SpringAmqpTest类中,定义一个单元测试方法:
confiremCalback并没有要求RabbitTemblate只有唯一的一个confiremCalback。每次发送消息可以写不同的confiremCalback,去有不同的业务方案。所以是在发送消息的过程中进行添加。
- 准备消息
correlationData: 【消息的唯一ID、callback】
ID: 用的是UUID,确保每一个消息都是不唯一的id
callback就是confirmCallback。:第一:获取一个将来的对象,因为现在只是发送消息,发完以后等待将来的某一刻拿到回调。第二然后在进行添加callback。
其中:result:成功的回调函数。接收到RabbitMQ的回值就是成功回调。分为两部分:ack / nack
ex:是失败的回调函数。 什么时候会出现失败回调呢?在发送消息过程中,不知道为什么抛出了异常,导致回调都没有收到
- 发送消息。利用rabbitTemplage.转换并且发送(交换机的名字、routingKey的名称、消息体、【消息的唯一ID、callback】)
correlationData:在配置文件中有一个【publisher-confirm-type: correlated】
package cn.itcast.mq.spring;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.SuccessCallback;
import java.util.UUID;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
//String routingKey = "simple.queue";
// 1. 准备发送消息
String message = "hello, spring amqp!";
// 2. 准备失败回调
CorrelationData correlationData = https://www.it610.com/article/new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(confirm -> {
if (confirm.isAck()){
log.debug("消息成功投递到交换机:消息id{}",correlationData.getId());
}else{
// nack
log.error("消息还没有投递到交换机,就失败了:消息id{}",correlationData.getId());
}
}, throwable -> {
log.error("消息发送失败:",throwable);
});
//3. 发送消息:交换机、路由Key、消息、ConfirmCallback发消息那一刻去做回调
//rabbitTemplate.convertAndSend("amq.topic","simple.queue", message,correlationData);
// 将交换机的名字写错:
//rabbitTemplate.convertAndSend("amq.topic","simple.queue", message,correlationData);
// 将队列的名字写错:这样成功投递到交换机,但是队列投递失败:
//rabbitTemplate.convertAndSend("amq.topic","aaaa.simple.queue", message,correlationData);
}
}
测试:
- 手动添加队列
文章图片
- 绑定交换机和队列:
文章图片
- 发送消息成功了:
文章图片
文章图片
3.测试:发送到了交换机,但是发送队列时,发送失败了。
文章图片
二、消息持久化(确保消息在MQ中保存) 生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。
要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。
- 交换机持久化
- 队列持久化
- 消息持久化
在springAMQP中,交换机、队列、消息默认都是持久的。
-----…
交换机和队列在consumer(消费者)中创建时,默认是持久的
消息在publisher(出版、生产者)中创建时,默认是持久的
默认的为什么还要学习持久化:1. 交换机如何设定持久化:
因为持久化要写磁盘,并不是所有的业务都要进行持久化。
@Configuration
public class CommonConfig {// 设置交换机:
@Bean
public DirectExchange exchange(){
// 参数: 交换机的名称、是否持久化、当没有与Queue队列绑定时,是否自动删除
return new DirectExchange("simple.direct",true,false);
}
}
文章图片
- 查看原码持久化:
文章图片
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder构建队列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
文章图片
- 查看原码持久化:
文章图片
文章图片
文章图片
- 查看原码持久化:
文章图片
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。
设想这样的场景:
- 1)RabbitMQ投递消息给消费者
- 2)消费者获取消息后,返回ACK给RabbitMQ
- 3)RabbitMQ删除消息
- 4)消费者宕机,还未来及进行消费消息。从而消息在消费者中丢失了
这样,消息就丢失了。因此消费者返回ACK的时机非常重要。
- none模式下:消息投递是不可靠的,可能丢失
- auto模式类似事务机制,出现异常时返回nack,消息回滚到mq,默认会立马重发消息;没有异常,返回ack。
- manual:自己根据业务请,判断什么时候该ack。
演示:auto模式:
- 添加依赖:
因为这是在确保消费者能收到消息,并且能消费消息。所以应该在consumer消费者中添加消息确认机制:
修改consumer服务的application.yml文件,添加下面内容:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 关闭ack
- 在consumer服务下:进行监听消息,并且接受消息的方法中:模拟一个消息处理异常:
- 在队列中添加一个消息:
文章图片
可以看到有一个消息在队列中了:
文章图片
现在是:消息队列在发送完消息后,在等待消息确认,而其实服务器已经发生了异常。
文章图片
文章图片
四、消费失败重置机制【确保消费者消费消息】 默认的失败重试机制:无限制的:MQ的队列将消息发送给消费者,消费者把消息重新投递给MQ的队列【返回NACK】
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
文章图片
1.本地重试
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。【不返回ACK,也不返回NACK,在本地进行尝试,尝试到成功、或者尝试的上限为止。】 在采取策略:1. 再把消息投递给MQ,人工介入。2.直接抛弃。
修改consumer服务的application.yml文件,添加内容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
文章图片
重启consumer服务,重复之前的测试。可以发现:
- 在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
- 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了
文章图片
结论:
- 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
- 重试达到最大次数后,Spring会返回ack,消息会被丢弃
在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式,队列和消费者快速的互推
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。频率稍微比上面默认的情况低一些,在本地测试了以后,在返回队列。
- 推荐:RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。(republish重新发布)
文章图片
RepublishMessageRecoverer失败后重新投递到指定交换机中。
1)在consumer服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
2)定义一个RepublishMessageRecoverer,关联队列和交换机
按着springBoot自动装配的原理,想覆盖spring的默认配置,只需要自己定义一个Bean就能覆盖了。 所以说:自己定义一个Bean:MessageRecoverer (消息回收站)就会覆盖默认的:
文章图片
创建new我们要遵循的策略RepublishMessageRecoverer是重发,
参数一:由于重发需要rabbitTemplate.。
参数二和三: 既然重发,指定交换机和路由routingKey(路由:路由和队列匹配就能发送成功。)
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
完整代码:
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue , DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
文章图片
不光有错误的消息,并且还有消费者的错误信息带过来了:
文章图片
总结
如何确保RabbitMQ消息的可靠性?
【微服务的学习|微服务23_服务异步通信01(RabbitMQ消息可靠性)】三个角度来讲:
- 生产者消息弄丢
- MQ消息弄丢
- 消费者把消息弄丢
- 开启生产者确认机制,确保生产者的消息能到达队列
- 开启持久化功能,确保消息未消费前在队列中不会丢失
- 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack。重试次数耗尽后,会抛弃消息。
- 开启消费者失败重试机制,并设置MessageRecoverer,重试次数耗尽后,后将消息投递到异常交换机,交由人工处理
推荐阅读
- 微服务的学习|微服务08_RabbitMQ的SpringAMQP基本介绍
- 微服务的学习|微服务24_服务异步通信02(RabbitMQ延迟消息的问题-DelayExchange插件)
- 微服务的学习|微服务24_服务异步通信04(RabbitMQ高可用问题、镜像集群、仲裁队列)
- Java|Java面试突击系列(五)(Redis集群模式)
- 面试|上周,XX保险面试,凉了!!!
- 移动开发|校友在美团 Android 岗的四面分享~
- 游戏|直播新玩法背后的音视频技术演进
- android|记录JAVA中Calendar类的一个问题
- 并发编程的理论基石