少年击剑更吹箫,剑气箫心一例消。这篇文章主要讲述#yyds干货盘点#RabbitMQ示例2:工作队列相关的知识,希望能为你提供帮助。
作者:汤圆
个人博客: javalover.cc
工作队列
目录
- 定义
- 基础DEMO
- 知识点:轮询机制、公平机制、消息确认、消息持久
但是有了工作队列,就可以先把请求要处理的任务放到工作队列中,然后快速响应请求
基础DEMO1. 生产者
NewTask.java
public class NewTask private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args)
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ服务器地址(默认也是localhsot)
factory.setHost("localhost");
try
// 创建连接
Connection connection = factory.newConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 这里"hello."包含一个".",表示耗时1S
String message = "hello.";
for (int i = 0;
i <
9;
i++)
// 发布消息
channel.basicPublish("", QUEUE_NAME,null, message.getBytes());
System.out.println("send:" + message);
catch (IOException e)
e.printStackTrace();
catch (TimeoutException e)
e.printStackTrace();
2. 消费者
Work.java
public class Work
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args)
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
try
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("waiting for messages, to exit press CTRL+C");
DeliverCallback callback = (s, delivery)->
String s1 = new String(delivery.getBody(), "utf-8");
System.out.println("received: "+s1);
try
doWork(s1);
catch (InterruptedException e)
e.printStackTrace();
finally
System.out.println("done");
;
channel.basicConsume(QUEUE_NAME, true, callback, consumeTag->
);
catch (IOException e)
e.printStackTrace();
catch (TimeoutException e)
e.printStackTrace();
// 有多个".",就耗时多少秒
private static void doWork(String s1) throws InterruptedException
for (String c: s1.split(""))
if(c.equals("."))
Thread.sleep(1000);
知识点 1. 轮询机制
比如有一个生产者,三个消费者;生产者发了九条消息,那么三个消费者每人可以拿到三条消息
与之对应的是公平机制,下面会讲到
示例:将上面的消费者程序Work再复制两份,总共三个消费者
Work1.java
,Work2.java
,Work3.java
步骤:
- 【#yyds干货盘点#RabbitMQ示例2(工作队列)】依次启动这三个消费者
- 最后启动生产者程序
NewTask.java
- 从控制台可以看到,平均每个消费者拿到三条消息
文章图片
2. 消息确认
之所以默认手动,是因为自动存在的隐患(消息丢失):
- 如果消费者执行的任务比较耗时,那么有可能在任务执行过程中,该消费者挂掉;
- 但是此时消费者已经删除了这条消息,那么结果就是这条消息丢失;
- 而且给这个消费者发送的其他后续消息(已确认收到,但是还没处理)也一并丢失了。
下面用表格列出:手动确认和自动确认的区别
手动确认 | 自动确认 | |
---|---|---|
消息丢失 | 不会 | 会(已被确认的消息,会被删除,导致消息丢失) |
消息阻塞 | 会(忘记确认消息,导致该消费者接收的其他后续消息也无法处理) | 不会 |
示例1:修改
Work3.java
,其中1. 和2. 是修改的部分// 2. === begin ===
// 注释掉手动确认的代码,假设忘记了手动确认
// channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println("forget ack");
// 2. === end ===// 1. === begin ===
// 这里将之前的自动确认改为手动确认
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, callback, consumeTag->
);
// 1. === end ===
步骤
- 三个消费者顺序启动(Work1, Work2, Work3),然后启动生产者(NewTask)
- 观察控制台,如下所示
文章图片
步骤:
- 上面的例子中,每个消费者都有三条消息,其中Work3的三条消息都还没确认
- 此时中断Work3,可以看到,Work1又执行了1条(总共4条),Work2又执行了2条(总共5条)
文章图片
上面的手动确认,只是保证了消费者如果挂掉,消息不会丢失
但是如果是RabbitMQ服务挂了呢?
可以通过设置参数来保证队列和消息基本不会丢失(不能完全保证不丢失,要完全保证不丢失,可以参考publisher confirms)
// 1. 第二个参数设置为true,生产者和消费者都要改
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 2. basic_properties设置为PERSISTENT_TEXT_PLAIN
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
示例修改
NewTask.java
// 1. 重新定义一个队列:因为RabbitMQ不允许以不同的参数 重复定义同一个队列
private final static String QUEUE_NAME_DURABLE = "work_queue_durable";
// 2. 队列持久化:如果RabbitMQ服务挂了,保证队列还存在
boolean durable = true;
channel.queueDeclare(QUEUE_NAME_DURABLE, durable, false, false, null);
// 3.消息持久化:如果RabbitMQ服务挂了,保证消息还存在;
// 这里将basicProperties属性设置为 PERSISTENT_TEXT_PLAIN
channel.basicPublish("", QUEUE_NAME_DURABLE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
修改
Work1.java
,Work2.java
,Work3.java
,下面的代码同步修改到三个消费者中// 1. 重新定义一个队列:因为RabbitMQ不允许以不同的参数 重复定义同一个队列
private final static String QUEUE_NAME_DURABLE = "work_queue_durable";
// 2. 队列持久化:如果RabbitMQ服务挂了,保证队列还存在
boolean durable = true;
channel.queueDeclare(QUEUE_NAME_DURABLE, durable, false, false, null);
// 3. 修改消费时的队列名称
channel.basicConsume(QUEUE_NAME_DURABLE, true, callback, consumeTag->
);
4. 公平机制
前面的例子,我们看到,就算消费者忘记手动确认,RabbitMQ还是将消息均匀的分配给了每个消费者(即忘记确认的消费者后续还会收到消息),
其实这是不合理不公平的,因为这样就导致,那些忘记确认的消费者一直占着消息不去处理,造成消息阻塞,RabbitMQ占用内存也会越来越大
示例修改刚才的
Work3.java
,因为Work3.java
中忘记了手动确认,符合这个测试场景// 设置公平分配策略,即消费者确认了一个消息后,RabbitMQ才会给它分配下一个消息
int prefetchCount = 1;
channel.basicQos(prefetchCount);
步骤
- 三个消费者顺序启动(Work1, Work2, Work3),然后启动生产者(NewTask)
- 观察控制台,可以看到,Work3只接受了一条消息,Work1和Work2分别接收了4条消息
文章图片
推荐阅读
- CentOS 7.9x64 配置ASM安装 Oracle 12cR1
- #yyds干货盘点#CSS-ing Candy Ghost 按钮
- WP get_footer函数不显示页脚
- WP-子主题获取主题目录
- wp-bootstrap-nav walker崩溃在iPad上不起作用
- Wordress-如何使用非WordPress主题
- WordPress-wp_nav_menu container_class不更改容器类
- wordpress(woocomerce)如何向标题添加按钮
- WordPress小部件未显示在自定义主题上