RabbitMQ实例C#

驱动组件.NET版本

官网推荐驱动:RabbitMQ.Client
https://www.rabbitmq.com/devtools.html#dotnet-dev
Connection和Channel

Connection是一个TCP连接,一般服务器这种资源都是很宝贵的,所以提供了Channel,完成消息的发布消费。这样Connection就可以做成单例模式的。
1、事件
Connection和Channel里面包含了几个事件。分别在不同的情况下触发

其他时间执行发生异常,就会执行这个
Connection.CallbackException
恢复连接成功
Connection.RecoverySucceeded
连接恢复异常时会触发这个事件
Connection.ConnectionRecoveryError
RabbitMQ出于自身保护策略,通过阻塞方式限制写入,导致了生产者应用“假死”,不对外服务。比若说CPUIO RAM下降,队列堆积,导致堵塞。就会触发这个事件
Connection.ConnectionBlocked
阻塞解除会触发这个事件
Connection.ConnectionUnblocked
connection断开连接时候
Connection.ConnectionShutdown
-------------------------------------------------------------------------------------------------------------------------------------------------------
.NET RabbitMQ.Client中Channel叫做Model

channel断开连接时候触发
Channel.ModelShutdown
【RabbitMQ实例C#】其他时间执行发生异常,就会执行这个
Channel.CallbackException
broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 basic.return)
Channel.BasicReturn
Channel.BasicRecoverOk
Signalled when a Basic.Nack command arrives from the broker.
Channel.BasicNacks
Signalled when a Basic.Ack command arrives from the broker.
Channel.BasicAcks
Channel.FlowControl
2、属性
最大channel数量
connetion.ChannelMax
服务上这个连接的对象属性
connetion.ClientProperties
服务器上这个连接的名字
connetion.ClientProvidedName
关闭原因
connetion.CloseReason
端口
connetion.Endpoint
和客户端通信时所允许的最大的frame size
connetion.FrameMax
连接的心跳包
connetion.Heartbeat
是否打开
connetion.IsOpen
获取vhost
connetion.KnownHosts
本地端口
connetion.LocalPort
连接串使用的协议
connetion.Protocol
远程端口,服务器
connetion.RemotePort
服务器属性
connetion.ServerProperties
关停信息
connetion.ShutdownReport
----------------------------------------------------------------------------------------------------------------------------------------
channel编号
channel.ChannelNumber
关闭原因
channel.CloseReason
连接超时时间
channel.ContinuationTimeout
channel.IsClosed
channel.IsOpen
下一个消息编号
channel.NextPublishSeqNo
3、方法
终止连接以及他们的channel,可以指定时间长度。
connetion.Abort()
关闭连接以及他的channel
connetion.Close()
创建channel
connetion.CreateModel()
connetion.HandleConnectionBlocked()
connetion.HandleConnectionUnblocked()

发送消息Confirm模式

目的确认消息是否到达消息队列中
RabbitMQ实例C#
文章图片


1、mandatory
broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 basic.return)
channel.BasicReturn += Channel_BasicReturn; channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);


private static void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e) { Console.WriteLine("Channel_BasicReturn"); }


2、普通Confirm模式
channel.ConfirmSelect(); channel.BasicPublish("amq.direct", routingKey: "MyRoutKey1",mandatory:true, basicProperties: null, body: body); if (channel.WaitForConfirms()) { Console.WriteLine("普通发送方确认模式"); }


3、批量Confirm模式
channel.ConfirmSelect(); channel.BasicPublish("amq.direct", routingKey: "MyRoutKey",mandatory:true, basicProperties: null, body: body); channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body); channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body); channel.WaitForConfirmsOrDie(); Console.WriteLine("普通发送方确认模式");



4、异步Confirm模式
java版本组件有
5、事物

try { //声明事物 channel.TxSelect(); channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body); channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body); channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body); //提交事物 channel.TxCommit(); } catch (Exception) { //回滚 channel.TxRollback(); }

上面说的是生产者发布消息确认,那么消费者消费如何确认呢,大家都知道消费者有ack机制,但是用到事物的时候,是怎样的呢
1.autoAck=false手动应对的时候是支持事务的,也就是说即使你已经手动确认了消息已经收到了,但在确认消息会等事务的返回解决之后,在做决定是确认消息还是重新放回队列,如果你手动确认现在之后,又回滚了事务,那么已事务回滚为主,此条消息会重新放回队列;
2.autoAck=true如果自定确认为true的情况是不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把消息移除了;

事物比较耗性能

简单消息发送

static void Main(string[] args) {ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.140.161"; factory.Port = 5672; factory.UserName = "admin"; factory.Password = "admin"; factory.VirtualHost = "TestVHost"; //创建connetion using (var connetion = factory.CreateConnection()) { connetion.CallbackException += Connetion_CallbackException; connetion.RecoverySucceeded += Connetion_RecoverySucceeded; connetion.ConnectionRecoveryError += Connetion_ConnectionRecoveryError; connetion.ConnectionBlocked += Connetion_ConnectionBlocked; connetion.ConnectionUnblocked += Connetion_ConnectionUnblocked; //连接关闭的时候 connetion.ConnectionShutdown += Connetion_ConnectionShutdown; //创建channel using (var channel = connetion.CreateModel()) {//消息会在何时被 confirm? //The broker will confirm messages once: //broker 将在下面的情况中对消息进行 confirm : //it decides a message will not be routed to queues //(if the mandatory flag is set then the basic.return is sent first) or //broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 basic.return) //a transient message has reached all its queues(and mirrors) or //非持久属性的消息到达了其所应该到达的所有 queue 中(和镜像 queue 中) //a persistent message has reached all its queues(and mirrors) and been persisted to disk(and fsynced) or //持久消息到达了其所应该到达的所有 queue 中(和镜像 queue 中),并被持久化到了磁盘(被 fsync) //a persistent message has been consumed(and if necessary acknowledged) from all its queues //持久消息从其所在的所有 queue 中被 consume 了(如果必要则会被 acknowledge)//broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 basic.return) channel.BasicReturn += Channel_BasicReturn; //(可以不声明)如果不声明交换机 ,那么就使用默认的交换机(每一个vhost都会有一个默认交换机) //channel.ExchangeDeclare("amq.direct", ExchangeType.Direct,true); //创建一个队列bool durable(持久化), bool exclusive(专有的), bool autoDelete(自动删除) //channel.QueueDeclare("TestQueue", true, false, false, null); //不做绑定的话,使用默认的交换机。 //channel.QueueBind("TestQueue", "amq.direct", "MyRoutKey", null); //发布消息 var body = Encoding.UTF8.GetBytes("西伯利亚的狼"); channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body); }Console.WriteLine("Hello World!"); Console.ReadKey(); } }private static void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e) { Console.WriteLine("Channel_BasicReturn"); }private static void Connetion_ConnectionShutdown(object sender, ShutdownEventArgs e) { Console.WriteLine("Connetion_ConnectionShutdown"); }private static void Connetion_ConnectionUnblocked(object sender, EventArgs e) { Console.WriteLine("Connetion_ConnectionUnblocked"); }private static void Connetion_ConnectionBlocked(object sender, RabbitMQ.Client.Events.ConnectionBlockedEventArgs e) { Console.WriteLine("Connetion_ConnectionBlocked"); }private static void Connetion_ConnectionRecoveryError(object sender, RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs e) { Console.WriteLine("Connetion_ConnectionRecoveryError"); }private static void Connetion_RecoverySucceeded(object sender, EventArgs e) { Console.WriteLine("Connetion_RecoverySucceeded"); }private static void Connetion_CallbackException(object sender, RabbitMQ.Client.Events.CallbackExceptionEventArgs e) { Console.WriteLine("Connetion_CallbackException"); }



场景分析


RabbitMQ实例C#
文章图片

消息持久化

Broker持久化、交换机持久化、队列持久化。目的是维持重启后这些东西的存在。
消息持久化,才是把消息持久化到硬盘中,因为消息在队列中,所以需要队列持久化。
设置消息持久化,需要设置basicProperties的DeliveryMode=2 (Non-persistent (1) or persistent (2)).默认的就是持久化。
//发布消息 var body = Encoding.UTF8.GetBytes("西伯利亚的狼"); BasicProperties pro = new BasicProperties(); pro.DeliveryMode = 2; channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: pro, body: body);


消费者消费消息

RabbitMQ实例C#
文章图片

为了确保一个消息永远不会丢失,RabbitMQ支持消息确认(message acknowledgments)。当消费端接收消息并且处理完成后,会发送一个ack(消息确认)信号到RabbitMQ,RabbitMQ接收到这个信号后,就可以删除掉这条已经处理的消息任务。但如果消费端挂掉了(比如,通道关闭、连接丢失等)没有发送ack信号。RabbitMQ就会明白某个消息没有正常处理,RabbitMQ将会重新将消息入队,如果有另外一个消费端在线,就会快速的重新发送到另外一个消费端。
RabbitMQ中没有消息超时的概念,只有当消费端关闭或奔溃时,RabbitMQ才会重新分发消息。
ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.140.161"; factory.Port = 5672; factory.UserName = "admin"; factory.Password = "admin"; factory.VirtualHost = "TestVHost"; //创建connetion using (var connetion = factory.CreateConnection()) {using (var channel = connetion.CreateModel()) { //构造消费者实例 var consumer = new EventingBasicConsumer(channel); //绑定消息接收后的事件委托 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine(" [x] Received {0}", message); Thread.Sleep(6000); //模拟耗时 Console.WriteLine(" [x] Done"); // 主要改动的是将 autoAck:true修改为autoAck:fasle,以及在消息处理完毕后手动调用BasicAck方法进行手动消息确认。 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //启动消费者 channel.BasicConsume(queue: "TestQueue", autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }using (var connetion = factory.CreateConnection()) {using (var channel = connetion.CreateModel()) { //构造消费者实例 var consumer = new EventingBasicConsumer(channel); //绑定消息接收后的事件委托 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine(" [x] Received {0}", message); Thread.Sleep(6000); //模拟耗时 Console.WriteLine(" [x] Done"); }; //启动消费者 channel.BasicConsume(queue: "TestQueue", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }



消费负载均衡

RabbitMQ实例C#
文章图片

1、当一个队列有多个消费者时,队列会以循环(round-robin)的方式发送给消费者。每条消息只会给一个订阅的消费者。
2、默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均每个消费者将获得相同数量的消息。这种分发消息的方式叫做循环(round-robin)。
3、RabbitMQ的消息分发默认按照消费端的数量,按顺序循环分发。这样仅是确保了消费端被平均分发消息的数量,但却忽略了消费端的闲忙情况。这就可能出现某个消费端一直处理耗时任务处于阻塞状态,某个消费端一直处理一般任务处于空置状态,而只是它们分配的任务数量一样。
但我们可以通过channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时,不再分配任务。
分库分表模式

比如说客户积分同步。
一般电商中这种数据量比较大,及时性比较高。
ID 编号 1-10000的用户积分表更放在队列1,10001-20000放在队列2,不同的消费者消费不同队列。以此类推...
RPC

RabbitMQ实例C#
文章图片

第一步,主要是进行远程调用的客户端需要指定接收远程回调的队列,并申明消费者监听此队列。
第二步,远程调用的服务端除了要申明消费端接收远程调用请求外,还要将结果发送到客户端用来监听的结果的队列中去。
远程调用客户端:
//申明唯一guid用来标识此次发送的远程调用请求 var correlationId = Guid.NewGuid().ToString(); //申明需要监听的回调队列 var replyQueue = channel.QueueDeclare().QueueName; var properties = channel.CreateBasicProperties(); properties.ReplyTo = replyQueue; //指定回调队列 properties.CorrelationId = correlationId; //指定消息唯一标识 string number = args.Length > 0 ? args[0] : "30"; var body = Encoding.UTF8.GetBytes(number); //发布消息 channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body); Console.WriteLine($"[*] Request fib({number})"); // //创建消费者用于处理消息回调(远程调用返回结果) var callbackConsumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer); callbackConsumer.Received += (model, ea) => { //仅当消息回调的ID与发送的ID一致时,说明远程调用结果正确返回。 if (ea.BasicProperties.CorrelationId == correlationId) { var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}"; Console.WriteLine($"[x]: {responseMsg}"); } };

远程调用服务端:
//申明队列接收远程调用请求 channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); Console.WriteLine("[*] Waiting for message."); //请求处理逻辑 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int n = int.Parse(message); Console.WriteLine($"Receive request of Fib({n})"); int result = Fib(n); //从请求的参数中获取请求的唯一标识,在消息回传时同样绑定 var properties = ea.BasicProperties; var replyProerties = channel.CreateBasicProperties(); replyProerties.CorrelationId = properties.CorrelationId; //将远程调用结果发送到客户端监听的队列上 channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo, basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString())); //手动发回消息确认 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"Return result: Fib({n})= {result}"); }; channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);


转载于:https://www.cnblogs.com/wudequn/p/10886733.html

    推荐阅读