#yyds干货盘点#RabbitMQ示例2(工作队列)

少年击剑更吹箫,剑气箫心一例消。这篇文章主要讲述#yyds干货盘点#RabbitMQ示例2:工作队列相关的知识,希望能为你提供帮助。
作者:汤圆
个人博客: javalover.cc
工作队列 目录

  • 定义
  • 基础DEMO
  • 知识点:轮询机制、公平机制、消息确认、消息持久
定义比如web服务器,某个请求的后台操作可能要耗时十几分钟,此时如果没有工作队列,那么请求就会响应超时
但是有了工作队列,就可以先把请求要处理的任务放到工作队列中,然后快速响应请求
基础DEMO1. 生产者
NewTask.java
public class NewTask private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ服务器地址(默认也是localhsot) factory.setHost("localhost"); try // 创建连接 Connection connection = factory.newConnection(); // 创建频道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 这里"hello."包含一个".",表示耗时1S String message = "hello."; for (int i = 0; i < 9; i++) // 发布消息 channel.basicPublish("", QUEUE_NAME,null, message.getBytes()); System.out.println("send:" + message); catch (IOException e) e.printStackTrace(); catch (TimeoutException e) e.printStackTrace();

2. 消费者
Work.java
public class Work private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); try Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("waiting for messages, to exit press CTRL+C"); DeliverCallback callback = (s, delivery)-> String s1 = new String(delivery.getBody(), "utf-8"); System.out.println("received: "+s1); try doWork(s1); catch (InterruptedException e) e.printStackTrace(); finally System.out.println("done"); ; channel.basicConsume(QUEUE_NAME, true, callback, consumeTag-> ); catch (IOException e) e.printStackTrace(); catch (TimeoutException e) e.printStackTrace(); // 有多个".",就耗时多少秒 private static void doWork(String s1) throws InterruptedException for (String c: s1.split("")) if(c.equals(".")) Thread.sleep(1000);

知识点 1. 轮询机制
比如有一个生产者,三个消费者;生产者发了九条消息,那么三个消费者每人可以拿到三条消息
与之对应的是公平机制,下面会讲到
示例:将上面的消费者程序Work再复制两份,总共三个消费者Work1.java,Work2.java,Work3.java
步骤:
  1. 【#yyds干货盘点#RabbitMQ示例2(工作队列)】依次启动这三个消费者
  2. 最后启动生产者程序NewTask.java
  3. 从控制台可以看到,平均每个消费者拿到三条消息
#yyds干货盘点#RabbitMQ示例2(工作队列)

文章图片

2. 消息确认
之所以默认手动,是因为自动存在的隐患(消息丢失):
  • 如果消费者执行的任务比较耗时,那么有可能在任务执行过程中,该消费者挂掉;
  • 但是此时消费者已经删除了这条消息,那么结果就是这条消息丢失;
  • 而且给这个消费者发送的其他后续消息(已确认收到,但是还没处理)也一并丢失了。
但是手动不存在这个隐患,手动会把挂掉的消费者里面的消息,重新分配到其他消费者
下面用表格列出:手动确认和自动确认的区别
手动确认 自动确认
消息丢失 不会 会(已被确认的消息,会被删除,导致消息丢失)
消息阻塞 会(忘记确认消息,导致该消费者接收的其他后续消息也无法处理) 不会
推荐用手动确认的方式来告诉生产者,我收到消息了(一般在任务完成后,手动回复确认消息)
示例1:修改Work3.java,其中1. 和2. 是修改的部分
// 2. === begin === // 注释掉手动确认的代码,假设忘记了手动确认 // channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); System.out.println("forget ack"); // 2. === end ===// 1. === begin === // 这里将之前的自动确认改为手动确认 boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, callback, consumeTag-> ); // 1. === end ===

步骤
  • 三个消费者顺序启动(Work1, Work2, Work3),然后启动生产者(NewTask)
  • 观察控制台,如下所示
    #yyds干货盘点#RabbitMQ示例2(工作队列)

    文章图片

示例2:继续上面的示例1,代码不变
步骤:
  • 上面的例子中,每个消费者都有三条消息,其中Work3的三条消息都还没确认
  • 此时中断Work3,可以看到,Work1又执行了1条(总共4条),Work2又执行了2条(总共5条)
    #yyds干货盘点#RabbitMQ示例2(工作队列)

    文章图片

3. 消息持久化
上面的手动确认,只是保证了消费者如果挂掉,消息不会丢失
但是如果是RabbitMQ服务挂了呢?
可以通过设置参数来保证队列和消息基本不会丢失(不能完全保证不丢失,要完全保证不丢失,可以参考publisher confirms)
// 1. 第二个参数设置为true,生产者和消费者都要改 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 2. basic_properties设置为PERSISTENT_TEXT_PLAIN channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

示例修改NewTask.java
// 1. 重新定义一个队列:因为RabbitMQ不允许以不同的参数 重复定义同一个队列 private final static String QUEUE_NAME_DURABLE = "work_queue_durable"; // 2. 队列持久化:如果RabbitMQ服务挂了,保证队列还存在 boolean durable = true; channel.queueDeclare(QUEUE_NAME_DURABLE, durable, false, false, null); // 3.消息持久化:如果RabbitMQ服务挂了,保证消息还存在; // 这里将basicProperties属性设置为 PERSISTENT_TEXT_PLAIN channel.basicPublish("", QUEUE_NAME_DURABLE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

修改Work1.java,Work2.java,Work3.java,下面的代码同步修改到三个消费者中
// 1. 重新定义一个队列:因为RabbitMQ不允许以不同的参数 重复定义同一个队列 private final static String QUEUE_NAME_DURABLE = "work_queue_durable"; // 2. 队列持久化:如果RabbitMQ服务挂了,保证队列还存在 boolean durable = true; channel.queueDeclare(QUEUE_NAME_DURABLE, durable, false, false, null); // 3. 修改消费时的队列名称 channel.basicConsume(QUEUE_NAME_DURABLE, true, callback, consumeTag-> );

4. 公平机制
前面的例子,我们看到,就算消费者忘记手动确认,RabbitMQ还是将消息均匀的分配给了每个消费者(即忘记确认的消费者后续还会收到消息),
其实这是不合理不公平的,因为这样就导致,那些忘记确认的消费者一直占着消息不去处理,造成消息阻塞,RabbitMQ占用内存也会越来越大
示例修改刚才的Work3.java,因为Work3.java中忘记了手动确认,符合这个测试场景
// 设置公平分配策略,即消费者确认了一个消息后,RabbitMQ才会给它分配下一个消息 int prefetchCount = 1; channel.basicQos(prefetchCount);

步骤
  • 三个消费者顺序启动(Work1, Work2, Work3),然后启动生产者(NewTask)
  • 观察控制台,可以看到,Work3只接受了一条消息,Work1和Work2分别接收了4条消息
  • #yyds干货盘点#RabbitMQ示例2(工作队列)

    文章图片
参考RabbitMQ官网教程:第二节

    推荐阅读