RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解
交换机属性
交换机属性:
name: 交换机名称
type: 交换机类型 direct,topic,fanout,headers
durability: 是否需要持久化,true 为持久化
auto delete: 当最后一个绑定到 exchange 上的队列被删除后,exchange 没有绑定的队列了,自动删除该 exchange
internal: 当前 exchange 是否用于 rabbitMQ 内部使用,默认为 false
arguments: 扩展参数,用于扩展 AMQP 协议自制定化使用
1.direct exchange类型
direct exchange: 所有发送到 direct exchange 的消息被转发到 routing key 中指定的queue
注意:direct模式可以使用 rabbitMQ 自带的 exchange:default exchange,所以不需要将 exchange 进行任何绑定(binding)操作,消息传递时,routingkey 必须完全匹配才会被队列接收,否则该消息会被抛弃。
流转示意图如下
![RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解](https://img.it610.com/image/info10/dafd62b36ffd40e7b4292f0b6df25d0c.jpg)
文章图片
image
代码地址:https://github.com/hmilyos/rabbitmqdemo.gitrabbitmq-api 项目下
消费端代码:
public class ConsumerDirectExchange {private static final Logger log = LoggerFactory.getLogger(ConsumerDirectExchange.class);
// 声明
public final static String EXCHANGE_NAME = "test_direct_exchange";
public final static String EXCHANGE_TYPE = "direct";
public final static String QUEUE_NAME = "test_direct_queue";
public final static String ROUTING_KEY = "test.direct";
public final static String ROUTING_KEY_ERROR = "test.direct.error";
public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException,
ConsumerCancelledException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 表示声明了一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
// 表示声明了一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 建立一个绑定关系:
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
log.info("收到消息:{}", msg);
}
}
}
启动消费端
![RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解](https://img.it610.com/image/info10/75152244207d4e4c9b5480e81ad62bb8.jpg)
文章图片
image 上管控台查看交换机和队列是否成功创建
![RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解](https://img.it610.com/image/info10/855263be84824d7c901d903cca5d763e.jpg)
文章图片
image 点击进去查看绑定情况
![RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解](https://img.it610.com/image/info10/bc28714654434e72a896afbd4df34749.jpg)
文章图片
image 生产端代码
public class ProducerDirectExchange {private final static Logger log = LoggerFactory.getLogger(ProducerDirectExchange.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String msg = "Hello World RabbitMQDirect Exchange test.direct Message... ";
log.info("生产端发送了:{}", msg);
channel.basicPublish(ConsumerDirectExchange.EXCHANGE_NAME, ConsumerDirectExchange.ROUTING_KEY, null, msg.getBytes());
//channel.basicPublish(ConsumerDirectExchange.EXCHANGE_NAME, ConsumerDirectExchange.ROUTING_KEY_ERROR, null, msg.getBytes());
channel.close();
connection.close();
}}
然后把生产端run一下
![RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解](https://img.it610.com/image/info10/01352cdea05942d59dce40aabef5b13a.jpg)
文章图片
image 再查看消费端的日志
![RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解](https://img.it610.com/image/info10/1a8d2b75a34d46c9ad376d0d2146264a.jpg)
文章图片
image 该消费端只接收 routingkey 为 test.direct 的消息,证明 direct exchange 类型的,routingkey 必须完全匹配才会被队列接收,否则该消息会被抛弃。
2.topic exchange 类型 【RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解】topic exchange: 所有发送到 topic exchange 的消息被转发到所有关心 routingkey 中 topic 的 queue 上
exchange 将 routingkey 和某 topic 进行模糊匹配,此时队列需要绑定一个 topic。
注意: topic 可以使用通配符进行模糊匹配
# 匹配一个或多个词,注意是词
* 只能匹配一个词
例如 “log.#” 能匹配到 “log.info.oa”
“log.*” 只能匹配到 “log.erro” 这种格式
具体示例图如下图,usa.news 能被 usa.#,#.news 所消费,usa.weather 能被 usa.#,#.weather 所消费...
![RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解](https://img.it610.com/image/info10/d44e3a1fa3de464f99dde8d752a0ad45.jpg)
文章图片
image 代码示例:
消费端:
public class ConsumerTopicExchange {private final static Logger log = LoggerFactory.getLogger(ConsumerTopicExchange.class);
// 声明
public static final String EXCHANGE_NAME = "test_topic_exchange";
public static final String EXCHANGE_TYPE = "topic";
public static final String QUEUE_NAME = "test_topic_queue";
public static final String ROUTING_KEY_one = "user.#";
public static final String ROUTING_KEY = "user.*";
public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException,
ConsumerCancelledException, InterruptedException {ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 1 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
// 2 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 3 建立交换机和队列的绑定关系:
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(QUEUE_NAME, true, consumer);
// 循环获取消息
while (true) {
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
log.info("消费端收到消息:{}", msg);
}
}
}
启动消费端,上管控台查看创建、绑定是否成功
![RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解](https://img.it610.com/image/info10/59dd71b37c014bf0b7f00b954f5f7b7f.jpg)
文章图片
image 确认成功后,编写生产端代码
public class ProducerTopicExchange {private final static Logger log = LoggerFactory.getLogger(ProducerTopicExchange.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
// 2 创建Connection
Connection connection = connectionFactory.newConnection();
// 3 创建Channel
Channel channel = connection.createChannel();
// 4 声明
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
String msg1 = "Hello World RabbitMQTopic Exchange Message ..." + routingKey1;
String msg2 = "Hello World RabbitMQTopic Exchange Message ..." + routingKey2;
String msg3 = "Hello World RabbitMQTopic Exchange Message ..." + routingKey3;
log.info("生产端, {} :{}", routingKey1, msg1);
channel.basicPublish(ConsumerTopicExchange.EXCHANGE_NAME, routingKey1, null, msg1.getBytes());
log.info("生产端, {} :{}", routingKey2, msg2);
channel.basicPublish(ConsumerTopicExchange.EXCHANGE_NAME, routingKey2, null, msg2.getBytes());
log.info("生产端, {} :{}", routingKey3, msg3);
channel.basicPublish(ConsumerTopicExchange.EXCHANGE_NAME, routingKey3, null, msg3.getBytes());
channel.close();
connection.close();
}
}
启动生产端
![RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解](https://img.it610.com/image/info10/3182fde5428b45a6b769c5c58e5b7bab.jpg)
文章图片
image 消费端接收到的
![RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解](https://img.it610.com/image/info10/2e81bb1a05c64bc7a49b4d4e444199ac.jpg)
文章图片
image routingKey3 ="user.delete.abc" 的未被接收,符合 user.* 的规则
这时候在消费端把 routingKey 修改一下, routingKey ="user.#",重启消费端,上管控台
![RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解](https://img.it610.com/image/info10/1cad6da10e1348e5a14a0f02b7aaad0b.jpg)
文章图片
image 发现之前 * 的并没有解绑,需要我们手动解绑一下,然后再启动生产端的代码
![RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解](https://img.it610.com/image/info10/eb9f9811b2244c309af32af01fecb1f9.jpg)
文章图片
image 发现三条都能接收到了,符合 # 的规则。
3.fanout exchange 类型 fanout exchange: 不处理路由键,只需要简单的将队列绑定到交换机上,发送到该交换机的消息都会被转发到于该交换机绑定的所有队列上,fanout 交换机由于不需要进行routingkey 的对比 直接发送所以绑定的 queue,所以转发消息是最快的
示意图如下图所示
![RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解](https://img.it610.com/image/info10/578da8500a534b3d8c7e5c443e2449e8.jpg)
文章图片
image 代码实现:
public class ConsumerFanoutExchange {
private static final Logger log = LoggerFactory.getLogger(ConsumerFanoutExchange.class);
public static final String EXCHANGE_NAME = "test_fanout_exchange";
public static final String EXCHANGE_TYPE = "fanout";
public static final String QUEUE_NAME = "test_fanout_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//不设置路由键
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(QUEUE_NAME, true, consumer);
log.info("消费端启动。。。");
//循环获取消息
while (true) {
//获取消息,如果没有消息,这一步将会一直阻塞
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
log.info("消费端收到消息:{}", msg);
}
}
}
生产端代码:
public class ProducerFanoutExchange {private static final Logger log = LoggerFactory.getLogger(ProducerFanoutExchange.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
// 2 创建Connection
Connection connection = connectionFactory.newConnection();
// 3 创建Channel
Channel channel = connection.createChannel();
// 5 发送
for (int i = 0;
i < 10;
i++) {
String msg = "Hello World RabbitMQFANOUT Exchange Message ...";
log.info("生产端,routingKey{}: {}", i, msg);
channel.basicPublish(ConsumerFanoutExchange.EXCHANGE_NAME, "" + i, null, (msg + i).getBytes());
}
channel.close();
connection.close();
}
}
先启动消费端,再启动生产端
![RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解](https://img.it610.com/image/info10/5a1bf5eb8e2e4b7f8cccb1c4d8c81e69.jpg)
文章图片
image 查看消费端的日志
![RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解](https://img.it610.com/image/info10/bb9998227b75432eb876abe2f4d64960.jpg)
文章图片
image routingkey0-9 的都能被就收,也就相当于该交换机上所有的队列都能接收来到该交换机的消息。
headers 类型的不常用,就不介绍了
5.binding binding: 绑定 exchange 和 exchange/queue 之间的连接关心。binding 中可以包含 routingkey 或者参数
6. Queue queue: 消息队列,实际存储消息数据,durability 表示是否持久化,durable 表示是,transient 表示否。auto delete: 如选择 yes,表示当最后一个监听被移除后,该 queue 会被自动删除。
7. Message message: 服务器和应用程序之间传送的数据 本质上就是一段数据,由 properties 和 payload(body) 组成
常用属性: delivery mode,headersheaders(自定义属性),content_type,content_encoding,priority,correlation_id,reply_to,expiration,message_id,timestamp,type,user_id,app_id,cluster_id
代码实现:
消费端:
public class Consumer {private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static void main(String[] args) throws IOException, TimeoutException,
ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过connection创建一个Channel
Channel channel = connection.createChannel();
//4 声明(创建)一个队列
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
//5 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6 设置Channel
channel.basicConsume(queueName, true, queueingConsumer);
while (true) {
//7 获取消息
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
log.info("消费端: " + msg);
Map headers = delivery.getProperties().getHeaders();
log.info("headers get myHeaders1 value: " + headers.get("myHeaders1"));
log.info("headers get myHeaders2value: " + headers.get("myHeaders2"));
//Envelope envelope = delivery.getEnvelope();
}}}
生产端:
public class Procuder {private static final Logger log = LoggerFactory.getLogger(Procuder.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
// 2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
// 3 通过connection创建一个Channel
Channel channel = connection.createChannel();
Map headers = new HashMap<>();
headers.put("myHeaders1", "111");
headers.put("myHeaders2", "222");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8")
.expiration("10000").headers(headers).build();
// 4 通过Channel发送数据
for (int i = 0;
i < 5;
i++) {
String msg = "Hello RabbitMQ!";
// 1 exchange 2 routingKey
log.info("生产端,test001: {}", msg);
channel.basicPublish("", "test001", properties, msg.getBytes());
}
// 5 记得要关闭相关的连接
channel.close();
connection.close();
}
}
先启动消费端,上管控台确认交换机和队列是否创建和绑定成功,再启动生产端,消费端接收到如下的信息
![RabbitMQ|RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解](https://img.it610.com/image/info10/83a93561dc1a494a964db84a1cddaeae.jpg)
文章图片
image 8. virtual host virtual host 虚拟主机
虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个 virtual host 里面可以有若干个 exchange 和 queue,但是里面不能有相同名称的 exchange 或 queue
推荐阅读
- 一个人的碎碎念
- 野营记-第五章|野营记-第五章 讨伐梦魇兽
- Shell-Bash变量与运算符
- 清明,是追思、是传承、是感恩。
- 牛人进化+|牛人进化+ 按自己的意愿过一生
- 七老修复好敏感、角质层薄、红血丝
- 华为旁!大社区、地铁新盘,佳兆业城市广场五期!
- 标签、语法规范、内联框架、超链接、CSS的编写位置、CSS语法、开发工具、块和内联、常用选择器、后代元素选择器、伪类、伪元素。
- 螃蟹和这些食物同吃,轻则腹泻、重则中毒!要小心哦~
- 八、「料理风云」