Destination高级特性

长恨人生不如水,等闲平地起波澜。 —— 刘禹锡
1. Wildcards(通配符) ??Wildcars用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。
??ActiveMQ支持以下三种通配符:
  • ".":用于作为路径上名字间的分隔符
  • ">":用于递归的匹配任何以这个名字开始的Destination(目的地)
  • "*":用于作为路径上任何名字。
??举例来说,如有以下两个Destination:
??PRICE.COMPUTER.JD.APPLE(苹果电脑在京东上的价格)
??PRICE.COMPUTER.TMALL.APPLE(苹果电脑在天猫上的价格)
??1. PRICE.> :匹配任何产品的价格变动
??2. PRICE.COMPUTER.> :匹配任何电脑产品的价格变动
??3. PRICE.COMPUTER.JD.*:匹配任何在京东上的电脑的价格变动
??4. PRICE.COMPUTER.*.APPLE:匹配苹果电脑京东或天猫上的价格变动
??JAVA代码示例:
??生产者:
// 实例化连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=false"); // 通过连接工厂获取连接 Connection connection = connectionFactory.createConnection(); // 启动连接 connection.start(); // 创建session Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); // 创建队列 Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE"); // 创建生产者 MessageProducer messageProducer = session.createProducer(destination); for (int i = 1; i <= 10; i++) { TextMessage textMessage = session.createTextMessage(message); messageProducer.send("Mac Air价格:"+ i * 1000); System.out.println("发送消息 - " + textMessage.getText()); } session.commit();

??消费者:
// 实例化连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=false"); // 通过连接工厂获取连接 Connection connection = connectionFactory.createConnection(); // 启动连接 connection.start(); // 创建session Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); // 创建队列 Destination destination = session.createQueue("PRICE.COMPUTER.>"); // 创建消费者 MessageConsumer messageConsumer = session.createConsumer(destination); messageConsumer.setMessageListener(new MessageListener(){ @Override public void onMessage(Message message) { try { System.out.println("收到的消息:" + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } });

??生产者发送消息日志:
发送消息:Mac Air价格:1000 发送消息:Mac Air价格:2000 发送消息:Mac Air价格:3000 发送消息:Mac Air价格:4000 发送消息:Mac Air价格:5000 发送消息:Mac Air价格:6000 发送消息:Mac Air价格:7000 发送消息:Mac Air价格:8000 发送消息:Mac Air价格:9000 发送消息:Mac Air价格:10000

??消费者接收到的消息日志:
收到的消息:Mac Air价格:1000 收到的消息:Mac Air价格:2000 收到的消息:Mac Air价格:3000 收到的消息:Mac Air价格:4000 收到的消息:Mac Air价格:5000 收到的消息:Mac Air价格:6000 收到的消息:Mac Air价格:7000 收到的消息:Mac Air价格:8000 收到的消息:Mac Air价格:9000 收到的消息:Mac Air价格:10000

??从消费者的接收日志来看,当消费者使用通配符队列时,是能正常接收消息的。
通配符中是为消费者服务的。即:通配符只能配置在消费端。
2. 组合列队(Composite Destinations) ??组合列队即通过一个虚拟的Destination代表多个Destination,这样就可以通过该虚拟的Destination同时向多个Destination发送消息。
??实现虚拟Destination可通过以下方式实现:
??(1) 客户端方式
??生产者:
Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE,PRICE.COMPUTER.TMALL.APPLE"); // 多个Destination使用,分隔

??消费者:
Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE"); // 或者 Destination destination = session.createQueue("PRICE.COMPUTER.TMALL.APPLE"); // 以上两个消费者都能接收到生产者的消息

若使用不同类型的destination,那么需要加上前缀如queue:// 或topic://。如:Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE,topic://PRICE.COMPUTER.TMALL.APPLE");
??(2) 配置方式
??在activemq.xml的broker中添加以下配置:

??然后在生产者中,直接向队列PRICE.COMPUTER中发送消息。那么监听PRICE.COMPUTER.JD.APPLE或PRICE.COMPUTER.TMALL.APPLE的消费者都能同时接收到生产者发送的消息。
注意:修改完配置文件后,须重启Broker服务,不然配置是不会生效的!
3. 配置启动队列 ??若需要在ActiveMQ启动的时候,创建Destination的话,可以如下配置conf/activemq.xml的broker下:

4. 队列选项 ??队列选项是给consumer在JMS规范之外添加的功能特性,通过在队列名称后面使用类似URL的语法添加多个选项。包括:
?? 1:consumer.prefetchSize,consumer持有的未确认最大消息数量,默认值 variable。
?? 2:consumer.maximumPendingMessageLimit:用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量,默认0。
??3:consumer.noLocal :默认false。
?? 4:consumer.dispatchAsync :是否异步分发 ,默认true。
??5:consumer.retroactive:是否为回溯消费者 ,默认false。
?? 6:consumer.selector:Jms的Selector,默认null。
?? 7:consumer.exclusive:是否为独占消费者 ,默认false。
?? 8:consumer.priority:设置消费者的优先级,默认0。
??示例如下:
queue = new ActiveMQQueue("PRICE.COMPUTER.TMALL.APPLE?consumer.dispatchAsync=true&consumer.prefetchSize=20"); consumer = session.createConsumer(queue);

5. 虚拟Destination ??ActiveMQ支持虚拟Destination有以下两种方式:
??(1) 虚拟主题
??(2) 组合Destinations(前面已介绍过)
5.1 为何使用虚拟主题 ??ActiveMQ中,topic只有在持久订阅下才是持久化的。持久订阅时,每个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:
??(1) 同一应用内consumer端负载均衡的问题:即同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个consumer都会获取所有消息。queue模式可以解决这个问题,但broker端又不能将消息发送到多个应用端。
??(2) 由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高。
??在ActiveMQ,可以通过虚拟主题来解决以上两个问题。
5.2 虚拟主题的使用 ??对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.Mobile。示例:
Topic destination = session.createTopic("VirtualTopic.Mobille");

??对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。如Consumer.A.VirtualTopic.Mobille,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.Mobille说明是一个名称为B的客户端。可以在同一个应用里使用多个consumer消费此Topic,则可以实现上面两个功能。又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。代码示例如下:
??应用A的消费者:
Destination destination = session.createQueue("Consumer.A.VirtualTopic.Mobile");

应用A的消费者启动两个及以上,下面称为A1、A2.
??应用B的消费者:
Destination destination = session.createQueue("Consumer.B.VirtualTopic.Mobile");

??应用A的消费者和应用B的消费者依次启动订阅,发布者发送消息。
??应用A消费者A1消费消息日志:
收到的消息:ActiveMQ 发送的消息:10 收到的消息:ActiveMQ 发送的消息:12 收到的消息:ActiveMQ 发送的消息:14 收到的消息:ActiveMQ 发送的消息:16 收到的消息:ActiveMQ 发送的消息:18

??应用A消费者A2消费消息日志:
收到的消息:ActiveMQ 发送的消息:11 收到的消息:ActiveMQ 发送的消息:13 收到的消息:ActiveMQ 发送的消息:15 收到的消息:ActiveMQ 发送的消息:17 收到的消息:ActiveMQ 发送的消息:19

??应用B消费者消息日志:
收到的消息:ActiveMQ 发送的消息:10 收到的消息:ActiveMQ 发送的消息:11 收到的消息:ActiveMQ 发送的消息:12 收到的消息:ActiveMQ 发送的消息:13 收到的消息:ActiveMQ 发送的消息:14 收到的消息:ActiveMQ 发送的消息:15 收到的消息:ActiveMQ 发送的消息:16 收到的消息:ActiveMQ 发送的消息:17 收到的消息:ActiveMQ 发送的消息:18 收到的消息:ActiveMQ 发送的消息:19

??从以上消费者日志可以看出,应用A的A1、A2两个消费者共同消费了10条消息,而应用B的消费者也同样消费了10条消息,但应用B只要一个消费者。
注意:若是activemq.xml中配置了destinationInterceptors,则需要将destinationInterceptors删除,然后重启Broker服务。
【Destination高级特性】??虚拟主题的前缀默认是VirtualTopic,若是需要修改,可通过以下方式修改:

    推荐阅读