Rabbitmq打怪升级之路(十一)Topic-主题通配符模式

:亚武de小文 【原创:转载请注明出处】
主题交换机模式(Topic) 通配符工作模式 Rabbitmq打怪升级之路(十一)Topic-主题通配符模式
文章图片
LengToo上学.png RabbitMQ有以下几种工作模式 :
  • Work queues
  • Publish/Subscribe
  • Routing
  • Topic
  • Headers
  • RPC
Topic 模型图 Rabbitmq打怪升级之路(十一)Topic-主题通配符模式
文章图片
[亚武de小文]Topic模型图.png Producer:生产者,是发消息方。
Exchanger:路由器交换机,此处类型为topic,它会把消息的路由键取出来,与绑定它的队列做路由键匹配。如果队列关心的路由键能匹配上消息的路由键,则将这个消息投递到这个队列中。
Queue01:一个队列,它是关心orange颜色的队列。
Queue02:一个队列,它是关心rabbit与lazy的队列。
Consumer01:消费者,从Queue01中获取消息。
Consumer01:消费者,从Queue02中获取消息。
* 可以替代一个单词
# 可以替换零个或多个单词
主题交换机模式:
1、每个消费者监听自己的队列,并且设置带通配符的routingkey。
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
参考代码 生产者
  • 队列绑定交换机指定通配符:【统配符规则】 中间以“.”分隔。符号#可以匹配多个词,符号*可以匹配一个词语。
    声明交换机,指定topic类型:
  • Producer.java
    package com.yawu.xiaowen.topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Topic交换机 * 生产者 * * @author yawu * @date 2019.06.30 */ public class Producer { private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); private static final String EXCHANGE_NAME = "mq_topic_exchange"; public static void main(String[] args) { try { // RabbitMQ建立连接的管理器 ConnectionFactory factory = new ConnectionFactory(); // 设置服务器地址 factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); // 创建一个连接 Connection connection = factory.newConnection(); // 创建一个信道 Channel channel = connection.createChannel(); String message = "开始发送信息-topic交换机"; //声明一个TOPIC类型的交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 向交换机发送消息 // 第一种情况:发送【*.orange.*】通配符消息 channel.basicPublish(EXCHANGE_NAME, "like.orange.color", null, message.getBytes("UTF-8")); // 第二种情况:发送【lazy.#】通配符消息 //channel.basicPublish(EXCHANGE_NAME, "lazy.boy.girl", null, message.getBytes("UTF-8")); LOGGER.info("消息发送成功:{}", message); channel.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } }}

消费者
  • Consumer01.java
    package com.yawu.xiaowen.topic; import com.rabbitmq.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * Topic交换机 * 消费者01 * @date 2019.06.30 * @author yawu */ public class Consumer01 { private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); private static final String EXCHANGE_NAME = "mq_topic_exchange"; private static final String QUEUE_NAME_01 = "mq_topic_queue_01"; private static final String QUEUE_NAME_02 = "mq_topic_queue_02"; public static void main(String[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明一个Topic类型的交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 声明两个队列 /** * 【参数详解】 * queue:要创建的队列名 * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息 * exclusive:true表示一个队列只能被一个消费者占有并消费 * autoDelete:true表示服务器不在使用这个队列是会自动删除它 * arguments:其它参数 */ channel.queueDeclare(QUEUE_NAME_01, true, false, false, null); channel.queueDeclare(QUEUE_NAME_02, true, false, false, null); final String ROUTING_KEY_ORANGE = "*.orange.*"; final String ROUTING_KEY_LAZY = "lazy.#"; // 队列一对ORANGE感兴趣,匹配XXX.orange.XXX 的消息 channel.queueBind(QUEUE_NAME_01, EXCHANGE_NAME, ROUTING_KEY_ORANGE); // 队列二对LAZY感兴趣,匹配lazy.XXX.XXX.XXX channel.queueBind(QUEUE_NAME_02, EXCHANGE_NAME, ROUTING_KEY_LAZY); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { final String message = new String(body, "UTF-8"); LOGGER.info("队列一收到消息:{}", message); } }; // 队列一确认消息 channel.basicConsume(QUEUE_NAME_01, true, consumer); } catch (Exception e) { LOGGER.error("an exception was occurred , caused by :{}", e.getMessage()); } } }

  • Consumer02.java
    package com.yawu.xiaowen.topic; import com.rabbitmq.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * Topic交换机 * 消费者02 * @date 2019.06.30 * @author yawu */ public class Consumer02 { private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); private static final String EXCHANGE_NAME = "mq_topic_exchange"; private static final String QUEUE_NAME_01 = "mq_topic_queue_01"; private static final String QUEUE_NAME_02 = "mq_topic_queue_02"; public static void main(String[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明一个Topic类型的交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 声明两个队列 /** * 【参数详解】 * queue:要创建的队列名 * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息 * exclusive:true表示一个队列只能被一个消费者占有并消费 * autoDelete:true表示服务器不在使用这个队列是会自动删除它 * arguments:其它参数 */ channel.queueDeclare(QUEUE_NAME_01, true, false, false, null); channel.queueDeclare(QUEUE_NAME_02, true, false, false, null); final String ROUTING_KEY_ORANGE = "*.orange.*"; final String ROUTING_KEY_LAZY = "lazy.#"; // 队列一对ORANGE感兴趣,匹配XXX.orange.XXX 的消息 channel.queueBind(QUEUE_NAME_01, EXCHANGE_NAME, ROUTING_KEY_ORANGE); // 队列二对LAZY感兴趣,匹配lazy.XXX.XXX.XXX channel.queueBind(QUEUE_NAME_02, EXCHANGE_NAME, ROUTING_KEY_LAZY); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { final String message = new String(body, "UTF-8"); LOGGER.info("队列二收到消息:{}", message); } }; //队列一确认消息 channel.basicConsume(QUEUE_NAME_02, true, consumer); } catch (Exception e) { LOGGER.error("an exception was occurred , caused by :{}", e.getMessage()); } } }

测试及运行分析
  1. 启动生产者服务:(先以第一种情况*.orange.*发送)

    Rabbitmq打怪升级之路(十一)Topic-主题通配符模式
    文章图片
    启动生产者服务.png
  2. 打开RabbitMQ的管理界面,查看绑定关系

    Rabbitmq打怪升级之路(十一)Topic-主题通配符模式
    文章图片
    查看绑定关系.png
  3. 发现第一种情况自动匹配到了Queue01

    Rabbitmq打怪升级之路(十一)Topic-主题通配符模式
    文章图片
    匹配队列01.png
  4. 【同理】,第二种情况此处略过。
    启动消费者服务,对消息进行消费

    Rabbitmq打怪升级之路(十一)Topic-主题通配符模式
    文章图片
    启动消费者.png
    Rabbitmq打怪升级之路(十一)Topic-主题通配符模式
    文章图片
    消息消费.png
  5. 【Rabbitmq打怪升级之路(十一)Topic-主题通配符模式】至此,Topic主题通配符交换机学习结束。

    推荐阅读