Springboot整合RabbitMQ(三)——Topic主题交换机
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键是有规则的。简单地介绍下规则:
*
(星号) 用来表示一个单词 (必须出现的)
#
(井号) 用来表示任意数量(零个或多个)单词
通配的路由键是跟队列进行绑定的,举个小例子:
队列Q1路由键为*.TT.*
,队列Q2路由键为TT.#
;
如果一条消息携带的路由值为A.TT.B
,那么队列Q1将会收到;
如果一条消息携带的路由值为TT.AA.BB
,那么队列Q2将会收到;
当一个队列的绑定键为#
(井号)的时候,这个队列将会无视消息的路由键,接收所有的消息。
当*
(星号)和#
(井号)这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。所以主题交换机也就实现了扇形交换机和直连交换机的功能。
1、Maven依赖
org.springframework.boot
spring-boot-starter-parent
2.1.4.RELEASE
org.springframework.boot
spring-boot-starter-amqp
2、配置文件
server:
port: 30200spring:
rabbitmq:
host: 148.70.153.63
port: 5672
username: libai
password: password
3、生产者 3.1、配置文件 声明队列和交换机。
@Configuration
public class TopicRabbitConfig {
@Bean
public Queue firstQueue() {
return new Queue("topicQueue1", true, false, false);
}@Bean
public Queue secondQueue() {
return new Queue("topicQueue2", true, false, false);
}@Bean
public TopicExchange exchange() {
return new TopicExchange("topicExchange", true, false);
}// 将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
// 这样只要是消息携带的路由键是topic.man,才会分发到该队列
@Bean
public Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with("topic.man");
}// 将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
@Bean
public Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
}
3.2、发送消息
@RestController
public class SendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 使用RabbitTemplate,这提供了接收/发送等等方法@PostMapping("/sendTopicMessage1")
public String sendTopicMessage1() {
Map manMap = new HashMap<>();
manMap.put("messageId", String.valueOf(UUID.randomUUID()));
manMap.put("messageData", "man");
manMap.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
// 将消息携带路由键值:topic.man 发送到交换机topicExchange
rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
return "ok";
}@PostMapping("/sendTopicMessage2")
public String sendTopicMessage2() {
Map womanMap = new HashMap<>();
womanMap.put("messageId", String.valueOf(UUID.randomUUID()));
womanMap.put("messageData", "woman");
womanMap.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
// 将消息携带路由键值:topic.woman 发送到交换机topicExchange
rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
return "ok";
}
}
根据主题交换机的特性,携带路由键
topic.man
的消息将会发送到topicQueue1
和topicQueue2
这2个队列中,携带路由键topic.woman
的消息只会发送到topicQueue2
队列中。启动服务,用
postman
分别调用发送消息的2个接口。3.3、查看
RabbitMQ
的后台管理界面
可以看到已经有3条消息推送到队列中,等待被消费。文章图片
消息预览 3.4、查看队列
topicQueue1
上有1条消息等待被消费,topicQueue2
上则有2条消息等待被消费。文章图片
查看队列1 【Springboot整合RabbitMQ(三)——Topic主题交换机】
文章图片
查看队列2 4、消费者 通过注解
@RabbitListener
指定要消费的队列。@Component
@RabbitListener(queues = "topicQueue1")
@Slf4j
public class TopicReceiver1 {
@RabbitHandler
public void process(Map testMessage) {
log.info("TopicReceiver1消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage));
}
}
@Component
@RabbitListener(queues = "topicQueue2")
@Slf4j
public class TopicReceiver2 {
@RabbitHandler
public void process(Map testMessage) {
log.info("TopicReceiver2消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage));
}
}
重新启动服务,可以看到控制台打印输出,说明该条消息已经被消费者消费成功了。
2020-09-13 22:41:45,893 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#4-1] [net.zhaoxiaobin.rabbitmq.consumer.topic.TopicReceiver1:25] [] TopicReceiver1消费者收到消息:{
"createTime": "2020-09-13 22:35:07",
"messageId": "6a5a2402-8dca-42c5-8778-f60c082e8a53",
"messageData": "man"
}
2020-09-13 22:41:46,056 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#5-1] [net.zhaoxiaobin.rabbitmq.consumer.topic.TopicReceiver2:25] [] TopicReceiver2消费者收到消息:{
"createTime": "2020-09-13 22:35:07",
"messageId": "6a5a2402-8dca-42c5-8778-f60c082e8a53",
"messageData": "man"
}
2020-09-13 22:41:46,057 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#5-1] [net.zhaoxiaobin.rabbitmq.consumer.topic.TopicReceiver2:25] [] TopicReceiver2消费者收到消息:{
"createTime": "2020-09-13 22:35:09",
"messageId": "c179f30a-9746-4d9e-9617-c667d26d74be",
"messageData": "woman"
}
Springboot整合RabbitMQ(一)——Direct直连交换机
Springboot整合RabbitMQ(二)——Fanout扇形交换机
参考链接
- Springboot 整合RabbitMq ,用心看完这一篇就够了
- github:https://github.com/senlinmu1008/rabbitmq-action/tree/master/rabbitmq
- gitee:https://gitee.com/ppbin/rabbitmq-action/tree/master/rabbitmq
推荐阅读
- Activiti(一)SpringBoot2集成Activiti6
- SpringBoot调用公共模块的自定义注解失效的解决
- 解决SpringBoot引用别的模块无法注入的问题
- Spring|Spring Boot 整合 Activiti6.0.0
- springboot使用redis缓存
- springboot整合数据库连接池-->druid
- SpringBoot中YAML语法及几个注意点说明
- springboot结合redis实现搜索栏热搜功能及文字过滤
- springboot中.yml文件的值无法读取的问题及解决
- SpringBoot整合MongoDB完整实例代码