卧疾丰暇豫,翰墨时间作。这篇文章主要讲述重学“消息队列”,详解 RabbitMQ 消息确认机制!相关的知识,希望能为你提供帮助。
引言RabbitMQ的模型是生产者发送信息到 Broker (代理),消费者从 Broker 中取出信息。但是生产者怎么知道消息是否真的发送到 Broker 中了呢?Broker 又怎么知道消息到底有没有被消费者消费?
如果由于网络原因出现故障,生产者生产的消息未到达 Broker 或者 Broker 的消息被虚假消费,而它们又不知道,就会产生很严重的问题,如重复消费等。
文章图片
01 RabbitMQ的消息确认流程
文章图片
从图中可以看出:
消息确认机制分为生产者确认和消费者确认
- ConfirmCallback 生产者
- ReturnCallback 生产者
- ACK 消费者
- 消息到达RabbitMQ的Exchange:Exchange向生产者发送Confirm确认。成功抑或失败都会返回一个confirmCallback
- 消息成功达到Exchange,但是从Exchange投递Queue失败:向生产者返回一个returnCallback。只有失败才会返回
- 消费者收到消息后需要对 RabbitMQ Server 进行消息 ACK 确认,RabbitMQ 根据确认信息决定是删除队列中的该信息还是重新发送
- rabbitMQTemplate.setConfirmCallback
- rabbitMQTemplate.setReturnCallback
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: root
password: root
#开启两个模式的生产者消息确认
publisher-confirm-type: simple
publisher-returns: true
2.声明交换机、队列,绑定交换机和队列
@Configuration
public class RabbitMQConfig {private static final String SB_TOPIC_EXCHANGE="sb_topic_exchange";
private static final String SB_TOPIC_QUEUE="sb_topic_queue1";
// 注入交换机 topic类型
@Bean("topicExchange")
public Exchange topicExchange(){
return ExchangeBuilder.topicExchange(SB_TOPIC_EXCHANGE).durable(true)
.autoDelete().build();
}
// 声明队列
@Bean
public Queue queue1(){
return QueueBuilder.durable(SB_TOPIC_QUEUE).build();
}// 绑定队列和交换机
@Bean
public Binding exchangQueue(@Qualifier("queue1") Queue queue, @Qualifier("topicExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("user.#").noargs();
}
3.创建消费者
@Component
@RabbitListener(queues = "sb_topic_queue1")
public class Consumer {@RabbitHandler
public void testPublishConfirm(String msg) {
System.out.println("收到的信息:"+msg);
}
}
4.创建生产者
创建生产者发送消息到消息队列,模拟两种异常情况
@SpringBootTest
class RabiitmqSpringbootApplicationTests {@Autowired
RabbitTemplate template;
@Test
void testConfirmTrue() {
// 设置confirm回调函数
template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) {
if (b) System.out.println("消息发送成功");
else System.out.println("消息发送失败");
}});
// 模拟生产者发送信息--正常情况
template.convertAndSend("sb_topic_exchange","user.info","日志级别:info;日志模块:user;日志信息:*****");
}@Test
void testConfirmFalse() {
// 设置confirm回调函数
template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) {
if (b) System.out.println("消息发送成功");
else System.out.println("消息发送失败");
}});
// 模拟生产者发送信息
// 不存在的交换机--异常情况
template.convertAndSend("sb_topic_exchange_noexist","user.info","日志级别:info;日志模块:user;日志信息:*****");
}@Test
void testReturnFalse() {
// 设置return回调函数
template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int i, java.lang.String s, java.lang.String s1, java.lang.String s2) {
System.out.println(message.toString());
System.out.println(s+"*********");
}});
template.setMandatory(true);
// 模拟生产者发送信息
// 正确的交换机 错误的routekey -- 异常情况
template.convertAndSend("sb_topic_exchange","noexist.user.info","日志级别:info;日志模块:user;日志信息:*****");
}
4.2 消费者确认重点在于消费者的下面两个方法
- channel.basicAck 消费者签收
- channel.basicNAck 消费者拒绝签收
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: root
password: root
#设置消费端手动签收
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
2.创建消费者
/**
* 注入消费者--手动签到
*/
@Component
@RabbitListener(queues = "sb_topic_queue1")
public class Consumer2 {@RabbitHandler
public void testComsumer(String msg, Channel channel, Message message) throws InterruptedException, IOException {
// 消费端设置手动签收代码
try {
System.out.println(msg);
// 正常签收,mq收到此消息被正常签收后即可从队列中删除vi信息
// 是哟了那个channel的方法
// 第一个参数是deliverytag 标识哪条信息 第二个参数是是否批量签收
// int i=2/0;
模拟异常
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
System.out.println("消费者签收了该信息,服务器你可以删了");
}catch (Exception e){
// 异常拒绝签收,让mq重发此信息
System.out.println("该信息丢了,给我重发");
channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);
// 该信息丢了,但是不需要你重发
// channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
}
}
}
3.创建生产者
@SpringBootTest
class RabiitmqSpringbootApplicationTests {@Autowired
RabbitTemplate template;
@Test
void testConsumerAck() {
template.convertAndSend("sb_topic_exchange","noexist.user.info","日志级别:info;日志模块:user;日志信息:*****");
}
}
推荐阅读
- 手把手教你在Windows和Linux下安装Redis及了解Redis基本操作
- 微软想进一步禁售京瓷WP8.1原型机
- WP8.1 GDR2更新将只向新设备推送
- WP8.1 Update 2将不会向现有设备推送
- Windows 8WP8.1版《神偷奶爸:小黄人快跑》停止更新
- Windows8系统降级为Win7系统的注意事项
- win8系统电脑运用超级按钮完成迅速搜索某个应用【图文】
- Win8系统下文件资源管理器的项目复选框的运用小技巧【图文】
- Win8系统内置的安全软件停止更改hosts文件的处理办法【图】