springboot中rabbitmq实现消息可靠性机制详解
1. 生产者模块通过publisher confirm机制实现消息可靠性
1.1 生产者模块导入rabbitmq相关依赖
org.springframework.boot spring-boot-starter-amqpcom.fasterxml.jackson.core jackson-databind
1.2 配置文件中进行mq的相关配置
spring.rabbitmq.host=10.128.240.183spring.rabbitmq.port=5672spring.rabbitmq.virtual-host=/spring.rabbitmq.publisher-confirm-type=correlatedspring.rabbitmq.publisher-returns=truespring.rabbitmq.template.mandatory=true
- publish-confirm-type:开启publisher-confirm,有以下可选值
correlated:异步回调,定义ConfirmCallback。mq返回结果时会回调这个ConfirmCallback
- publish-returns:开启publish-return功能。可以定义ReturnCallback
- template.mandatory: 定义消息路由失败的策略
false:直接丢弃消息
1.3 定义ReturnCallback(消息投递到队列失败触发此回调)
- 每个RabbitTemplate只能配置一个ReturnCallback。
- 当消息投递失败,就会调用生产者的returnCallback中定义的处理逻辑
- 可以在容器启动时就配置这个回调
@Slf4j@Configurationpublic class CommonConfig implements ApplicationContextAware { @Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 判断是否是延迟消息Integer receivedDelay = message.getMessageProperties().getReceivedDelay(); if (receivedDelay != null && receivedDelay > 0) {// 是一个延迟消息,忽略这个错误提示return; }// 记录日志log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",replyCode, replyText, exchange, routingKey, message.toString()); // 如果有需要的话,重发消息}); }}
1.4 定义ConfirmCallback(消息到达交换机触发此回调)
可以为redisTemplate指定一个统一的确认回调
@Slf4j@Configurationpublic class CommonConfig implements ApplicationContextAware { @Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 判断是否是延迟消息Integer receivedDelay = message.getMessageProperties().getReceivedDelay(); if (receivedDelay != null && receivedDelay > 0) {// 是一个延迟消息,忽略这个错误提示return; }// 记录日志log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",replyCode, replyText, exchange, routingKey, message.toString()); // 如果有需要的话,重发消息}); // 设置统一的confirm回调。只要消息到达broker就ack=truerabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("这是统一的回调"); System.out.println("correlationData:" + correlationData); System.out.println("ack:" + b); System.out.println("cause:" + s); }}); }}
也可以为特定的消息定制回调
@Autowiredprivate RabbitTemplate rabbitTemplate; @Testpublic void testmq() throws InterruptedException { CorrelationData correlationData = https://www.it610.com/article/new CorrelationData(UUID.randomUUID().toString()); correlationData.getFuture().addCallback(result->{if (result.isAck()) {// ACKlog.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId()); } else {// NACKlog.error("消息投递到交换机失败!消息ID:{}", correlationData.getId()); // 重发消息}},ex->{// 记录日志log.error("消息发送失败!", ex); // 重发消息}); rabbitTemplate.convertAndSend("example.direct","blue","hello,world",correlationData); }
2. 消费者模块开启消息确认 2.1 添加配置
# 手动ack消息,不使用默认的消费端确认spring.rabbitmq.listener.simple.acknowledge-mode=manual
- none:关闭ack,消息投递时不可靠的,可能丢失
- auto:类似事务机制,出现异常时返回nack,消息回滚到mq,没有异常,返回
- ackmanual:我们自己指定什么时候返回ack
@RabbitListener(queues = "order.release.order.queue")@Servicepublic class OrderCloseListener { @Autowiredprivate OrderService orderService; @RabbitHandlerprivate void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn()); try {orderService.closeOrder(orderEntity); // 第二个参数为false则表示仅确认此条消息。如果为true则表示对收到的多条消息同时确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) {// 第二个参数为ture表示将这个消息重新加入队列channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); }}}
3. 消费者模块开启消息失败重试机制 3.1 配置文件添加配置,开启本地重试
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
- 开启本地重试,如果消息处理过程总抛出异常,不会requeue到队列,而是在消费者本地重试
- 重试达到最大次数后,spring会返回ack,消息会被丢弃
- 当开启本地重试后,重试最大次数后消息直接丢弃。
- 三种策略,都继承于MessageRecovery接口
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct"); }@Beanpublic Queue errorQueue(){return new Queue("error.queue", true); } // 路由键为key@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); }
4.3 向容器中添加一个失败策略组件
@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){// error为路由键return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); }
到此这篇关于springboot中rabbitmq实现消息可靠性的文章就介绍到这了,更多相关springboot rabbitmq消息可靠性内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
推荐阅读
- 热闹中的孤独
- Shell-Bash变量与运算符
- JS中的各种宽高度定义及其应用
- 2021-02-17|2021-02-17 小儿按摩膻中穴-舒缓咳嗽
- 深入理解Go之generate
- 异地恋中,逐渐适应一个人到底意味着什么()
- Activiti(一)SpringBoot2集成Activiti6
- 我眼中的佛系经纪人
- 《魔法科高中的劣等生》第26卷(Invasion篇)发售
- “成长”读书社群招募