Rabbitmq打怪升级之路(六)生产者与消费者模型
:亚武de小文 【原创:转载请注明出处】生产者与消费者模型
文章图片
LengToo上学.png
【Rabbitmq打怪升级之路(六)生产者与消费者模型】RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上, RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。一、基本模型图
文章图片
[亚武de小文]生产者消费者模型图.png 二、工作流程
- 发送端
1)创建连接 2)创建通道 3)声明队列 4)发送消息
- 接收端
1)创建连接 2)创建通道 3)声明队列 4)监听队列 5)接收消息 6)ack回复
-
MsgProducer.java
package com.yawu.xiaowen.pc; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 生产者-发件人 * @date 2019.06.25 * @author yawu */ public class MsgProducer { private static final String QUEUE_NAME = "mq_pc_hello"; private static final Logger LOGGER = LoggerFactory.getLogger(MsgProducer.class); public static void main(String[] args) throws IOException, TimeoutException {Connection connection = null; Channel channel = null; try { // 连接管理器:应用程序与RabbitMQ建立连接的管理器。 ConnectionFactory factory = new ConnectionFactory(); // 服务器地址 factory.setHost("127.0.0.1"); // 帐号密码:默认为guest/guest,可省略 factory.setUsername("guest"); factory.setPassword("guest"); // 新建连接 connection = factory.newConnection(); // 再创建一个信道 channel = connection.createChannel(); //1、在信道中声明一个队列 /** * 参数详解 * queue:要创建的队列名 * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息 * exclusive:true表示一个队列是否独占连接, * autoDelete:true表示服务器不在使用这个队列是会自动删除它 * arguments:其它属性参数 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); //2、创建一条消息 String message = "Hello,亚武de小文!"; // 3、采用二进制流的方式传输 byte[] msg = message.getBytes("UTF-8"); // 4、channel是一个信道,它接收到msg数据,并将纳入到QUEUE_NAME队列中 /** * 消息发布方法参数详解: * exchange:如果没有指定,则使用Default Exchange * routingKey:消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列 * props:消息包含的属性 * body:消息体 */ channel.basicPublish("", QUEUE_NAME, null, msg); LOGGER.info("发件人---发送消息:{}", message); } catch (Exception e) { e.printStackTrace(); } finally { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } }} }
-
MsgConsumer.java
package com.yawu.xiaowen.pc; import com.rabbitmq.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * 消费者-收件人 * @date 2019.06.25 * @author yawu */ public class MsgConsumer { private static final String QUEUE_NAME = "mq_pc_hello"; private static final Logger LOGGER = LoggerFactory.getLogger(MsgConsumer.class); public static void main(String[] args) { try { // 应用程序与RabbitMQ建立连接的管理器。 ConnectionFactory factory = new ConnectionFactory(); // 服务器地址 factory.setHost("127.0.0.1"); // 新建一个连接 Connection connection = factory.newConnection(); // 创建一个信道 Channel channel = connection.createChannel(); //1、首先在通道中申明一个队列 /** * 参数详解 * queue:要创建的队列名 * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息 * exclusive:true表示一个队列只能被一个消费者占有并消费 * autoDelete:true表示服务器不在使用这个队列是会自动删除它 * arguments:其它参数 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); //2、创建消费者,并重写如何消费的方法,eg:输出消息 //3、首先从信道里面获取数据 Consumer consumer = new DefaultConsumer(channel) { /** * 消费者接收消息调用此方法 * @param consumerTag消费者的标签,在channel.basicConsume()去指定 * @param envelope消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志 * (收到消息失败后是否需要重新发送) * @param properties * @param body * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); LOGGER.info("收件人---收到消息:{}", message); } }; /** * 4、收到了消息后,提示信道已经收到消息了。可以继续发送其它消息 * 【注】第二个参数autoAck如果为false,那么消息会一直保存在RabbitMQ服务器中,Unacked * 消费者没有确认消息被消费,消息一直留在队列中,只有当从有新的消费者加入时,消息被分发到新的消费者。 */ channel.basicConsume(QUEUE_NAME, true, consumer); } catch (Exception e) { e.printStackTrace(); } } }
文章图片
PC同启1.png
文章图片
PC同启2.png
文章图片
PC同启3.png 2、关闭生产者服务,开启消费者服务
文章图片
P关C启1.png
文章图片
P关C启2.png 3、关闭消费者服务,开启生产者服务
文章图片
P启C关1.png
该信息处于队列中等待状态,等待消费者消费
文章图片
P启C关2.png
文章图片
P启C关3.png 4、服务都保持启动
- 设置autoAck参数为false
/** * 4、收到了消息后,提示信道已经收到消息了。可以继续发送其它消息 * 【注】第二个参数autoAck如果为false,那么消息会一直保存在RabbitMQ服务器中 * 消费者没有确认消息被消费,消息一直留在队列中,只有当从有新的消费者加入时,消息被分发到新的消费者。 */ channel.basicConsume(QUEUE_NAME, false, consumer);
文章图片
PC同启_autoAck1.png
- 生产者发送多条信息(此处我发出五条消息)
文章图片
PC同启_autoAck2.png
推荐阅读
- jhipster|jhipster 升级无效问题
- gitlab|Gitlab升级(12.2.1到14.6.4)
- GIS跨界融合赋能多领域技术升级,江淮大地新应用成果喜人
- 认知升级,向钱一步
- (29)学习《手机成为诱因,家庭矛盾升级,亲子冲突何时结束(》收获)
- 偏向锁、轻量级锁、重量级锁的升级以及区别
- 妇女节,女生节,女神节,配置一度再升级
- mac升级之(xcrun:|mac升级之:xcrun: error: invalid active developer path, missing xcrun)
- 22天,来一次思想行动升级!
- 总裁爹地我们不约|总裁爹地我们不约 第八章 升级秘书长