Rabbitmq消息发送事务与确认机制
2019独角兽企业重金招聘Python工程师标准>>>
文章图片
默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达服务器。如果在消息到达服务器之前己经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器 ,何谈持久化?
RabbitMQ针对这个问题,提供了两种解决方式:
1、通过事务机制实现。
2、通过发送方确认 publisher confirm 机制实现。
1、事务机制
开启事务后,客户端和RabbitMQ之间的通讯交互流程:
- 客户端发送给服务器Tx.Select(开启事务模式)
- 服务器端返回Tx.Select-Ok(开启事务模式ok)
- 推送消息
- 客户端发送给事务提交Tx.Commit
- 服务器端返回Tx.Commit-Ok
声明交换机、队列并绑定
/**
* 1、声明交换机、队列并绑定
*/
@org.junit.Test
public void decalreExchange() throws Exception {String exchange = "hello_tx";
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为direct
channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT,true,false,false,new HashMap<>());
String queueName = "hello_tx_c";
// 声明队列
channel.queueDeclare(queueName, true, false, false, null);
// 绑定队列到交换机
String routingKey = "aaa";
channel.queueBind(queueName, exchange, routingKey,null);
}
发送消息:这里通过 1/0 来产生异常
/**
* 生产者发送消息
* @throws Exception
*/
@org.junit.Test
public void sendMessage() throws Exception {
String exchange = "hello_tx";
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 消息内容
try {
channel.txSelect();
//开启事务
String message1 = "Less is more tx ";
// 发布消息到Exchange 指定路由键
channel.basicPublish(exchange, "aaa", MessageProperties.PERSISTENT_TEXT_PLAIN, message1.getBytes());
int i = 1/0;
channel.confirmSelect();
//提交事务}catch (Exception e) {
log.error("error:",e);
channel.txRollback();
//回滚
}channel.close();
connection.close();
}
可以看到,发送方出现异常,消息并没有发送到rabbitmq的队列里。
文章图片
文章图片
那么,既然已经有事务了,为何还要使用发送方确认模式呢,原因是因为事务的性能是非常差的。根据相关资料,事务会降低2~10倍的性能。
2、发送方确认模式
基于事务的性能问题,RabbitMQ团队为我们拿出了更好的方案,即采用发送方确认模式,该模式比事务更轻量,性能影响几乎可以忽略不计。
原理:生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),由这个id在生产者和RabbitMQ之间进行消息的确认。
confirm模式最大的好处在于他可以是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息决定下一步的处理。
如何使用
【Rabbitmq消息发送事务与确认机制】首先声明交换机、队列并绑定
/**
* 1、声明交换机、队列并绑定
*/
@org.junit.Test
public void decalreExchange() throws Exception {String exchange = "hello_confirm";
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为direct
channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT, true, false, false, new HashMap<>());
String queueName = "hello_confirm_c";
// 声明队列
channel.queueDeclare(queueName, true, false, false, null);
// 绑定队列到交换机
String routingKey = "aaa";
channel.queueBind(queueName, exchange, routingKey, null);
}
通过下面的代码,进行测试
/**
* 确认发送1条消息
*
* @throws Exception
*/
@org.junit.Test
public void sendMessage1() throws Exception {
String exchange = "hello_confirm";
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
channel.confirmSelect();
// 消息内容
String message1 = "Less is more confirm ";
// 发布消息到Exchange 指定路由键
channel.basicPublish(exchange, "aaa", null, message1.getBytes());
if (channel.waitForConfirms()){//等待回复
log.debug("发送成功");
}else{
log.debug("发送失败");
}
channel.close();
connection.close();
}/**
* 批量确认发送消息
*
* @throws Exception
*/
@org.junit.Test
public void sendMessage2() throws Exception {
String exchange = "hello_confirm";
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
channel.confirmSelect();
for (int i= 1;
i <= 3;
i++){
// 发布消息到Exchange 指定路由键
// 消息内容
String message = "Less is more confirm " + i;
channel.basicPublish(exchange, "aaa", null, message.getBytes());
}if (channel.waitForConfirms()){//批量确认
log.debug("发送成功");
}else{
log.debug("发送失败");
}
channel.close();
connection.close();
}/**
* 添加确认监听器
*
* @throws Exception
*/
@org.junit.Test
public void sendMessage3() throws Exception {
String exchange = "hello_confirm";
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple)
throws IOException {
log.debug("deliveryTag:{},multiple:{}",deliveryTag,multiple);
}public void handleNack(long deliveryTag, boolean multiple)
throws IOException {}
});
// 发布消息到Exchange 指定路由键
for (int i= 1;
i <= 3;
i++){
// 发布消息到Exchange 指定路由键
// 消息内容
String message = "Less is more confirm " + i;
channel.basicPublish(exchange, "aaa", null, message.getBytes());
}if (channel.waitForConfirms()){//批量确认
log.debug("发送成功");
}else{
log.debug("发送失败");
}channel.close();
connection.close();
}
详细源码地址
https://github.com/suzhe2018/rabbitmq-item
文章图片
转载于:https://my.oschina.net/suzheworld/blog/3003370
推荐阅读
- 危险也是机会
- python学习之|python学习之 实现QQ自动发送消息
- 深入浅出谈一下有关分布式消息技术(Kafka)
- 夏夜|夏夜 我们
- Vue组件之事件总线和消息发布订阅详解
- Redis——发布订阅/消息队列
- Java消息中间件概念基础
- 【20190827复盘】——好消息
- MQ(消息队列)功能介绍
- 前任结婚的消息,究竟有多大杀伤力()