1.Clustering 最典型的JMS集群模型是,由一组JMS代理器构成,单个客户端连接他们中的一个,如果该broker宕机,则会自动连接另一个broker。我们通过在客户端连接中使用协议栈failover://实现。参考该Failover Transport Reference页面查看具体配置信息。
我们支持通过 static discovery 或 dynamic discovery自动检测broker的机制。这样客户端能自动的连接到一组broker同时broker也能发现和连接其他broker从而形成更大的网络。
1.1Networks of brokers 如果在集群中,只是使用client/server拓扑,那么就可能存在有些broker的消息一直在堆积,但是却没有客户端连接消费消息的情况。因此ActiveMQ提供了Networks of brokers的机制,它支持分布式的queues和topics。
主要有以下两种配置xml的方式:
1.使用硬编码的方式在networkConnector 元素中增加
【ActiveMQ 分析系列(三)】
2.使用Discovery方式去检测broker
更多的配置信息可以参见 Networks of Brokers
1.2Master SlaveNetworks of brokers可以解决集群扩展的问题,但对于消息在同一时刻任然存在于单个broker上,如果这个broker宕机了,只能等他重新启动才能发送消息(如果消息非持久,则会丢失)。Master Slave的目的就是将消息复制到slave broker上,当出现故障时可以迅速的切换到slave上。
目前支持3种slave方式
Master Slave Type |
Requirements |
Pros |
Cons |
---|---|---|---|
Shared File System Master Slave |
A shared file system such as a SAN |
Run as many slaves as required. Automatic recovery of old masters |
Requires shared file system |
JDBC Master Slave |
A Shared database |
Run as many slaves as required. Automatic recovery of old masters |
Requires a shared database. Also relatively slow as it cannot use the high performance journal |
Replicated LevelDB Store |
ZooKeeper Server |
Run as many slaves as required. Automatic recovery of old masters. Very fast. |
Requires a ZooKeeper server. |
2.消息消费端特性 2.1Consumer Dispatch Async 分发异步机制broker将消息发送给客户端默认是异步的,异步就意味着多线程和上下文切换的消耗。但异步能将资源最大利用,因为不同的客户端访问的环境可能不同,速度有快有慢。如果认为客户端都够快,且不想将资源消耗在上下文切换上,可以设置为同步。如下:
//connect级别
((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false);
//connectFactory级别
((ActiveMQConnection)connection).setDispatchAsync(false);
//消息级别
queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false");
consumer = session.createConsumer(queue);
2.2Consumer Priority消息的发送是可以有权重级别的。在消费者端必须通过创建的时候设置级别。
queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");
consumer = session.createConsumer(queue);
级别可以从0~127,高级别能接受低级别的消息。如果在生产者端设置的级别是4,那么3级别的消费者不能马上收到该消息,而在5、6之间,broker会把消息优先发送给6。只有在消息堆积超过broker队列或缓存极限时,才会把消息分发给低级别的消费者。
2.3 Exclusive Consumer 在集群情况下,同一个queue的消息可能被不同的客户端消费,但有些环境里需要消息链上的多条消息被连续处理,Exclusive Consumer特性提供了对这一需求的支持。设置了该特性的队列,会将消息消费的行为绑定到单一的客户端上,除非发生错误产生失败转移到另一个客户端,否则将一直绑定下去。 这在集群里特别的适用。若要设置,可以如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
2.4 Manage Durable Subscribers 某些主题消息因为客户端在订阅后却长时间离线,而一直进驻内存,影响系统内存量及稳定性。有两种方式可以设置主题消息的过期时间,在配置文件中: 1.在 destinationPolicy标签中设置参数
" expireMessagesPeriod="300000"/>
2.在broker 标签中增加属性
2.5 Message GroupsMessage Groups就类似于并行的多个Exclusive Consumer队列。这些队列共用一个JMSXGroupID,同一JMSXGroupID的消息将固定发送给单一客户端,除非发生故障转移,或groups被关闭。可以如此使用:
Mesasge message = session.createTextMessage("hey ");
message.setStringProperty("JMSXGroupID", "IBM_NASDAQ_20/4/05");
...
producer.send(message);
2.6 Redelivery Policy 转发策略。
Property |
Default Value |
Description |
---|---|---|
collisionAvoidanceFactor |
0.15 |
The percentage of range of collision avoidance if enabled |
maximumRedeliveries |
6 |
Sets the maximum number of times a message will be redelivered before it is considered apoisoned pill and returned to the broker so it can go to a Dead Letter Queue (use value -1 to define infinite number of redeliveries) |
maximumRedeliveryDelay |
-1 |
Sets the maximum delivery delay that will be applied if the useExponentialBackOff option is set. (use value -1 to define that no maximum be applied) (v5.5) |
initialRedeliveryDelay |
1000L |
The initial redelivery delay in milliseconds |
redeliveryDelay |
1000L |
The delivery delay if initialRedeliveryDelay is 0 (v5.4) |
useCollisionAvoidance |
false |
Should the redelivery policy use collision avoidance |
useExponentialBackOff |
false |
Should exponential back-off be used (i.e. to exponentially increase the timeout) |
backOffMultiplier |
5 |
The back-off multiplier |
使用方式如下:
ActiveMQConnection connection ...// Create a connection
RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
queuePolicy.setInitialRedeliveryDelay(0);
queuePolicy.setRedeliveryDelay(1000);
queuePolicy.setUseExponentialBackOff(false);
queuePolicy.setMaximumRedeliveries(2);
RedeliveryPolicy topicPolicy = new RedeliveryPolicy();
topicPolicy.setInitialRedeliveryDelay(0);
topicPolicy.setRedeliveryDelay(1000);
topicPolicy.setUseExponentialBackOff(false);
topicPolicy.setMaximumRedeliveries(3);
// Receive a message with the JMS API
RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
map.put(new ActiveMQTopic(">"), topicPolicy);
map.put(new ActiveMQQueue(">"), queuePolicy);
2.7 Retroactive Consumer 该特性使得订阅了某topic的客户端,可以追溯之前已发送或错过的消息。
topic = new ActiveMQTopic("TEST.Topic?consumer.retroactive=true");
consumer = session.createConsumer(topic);
但该特性在集群环境下的activemq不能保证一贯性。
2.8 JMS Selectors 选择器可以支持对消息的筛选。 Selectors are a way of attaching a filter to a subscription to perform content based routing. Selectors are defined using SQL 92 syntax and typically apply to message headers; whether the standard properties available on a JMS message or custom headers you can add via the JMS code.
JMSType = 'car' AND color = 'blue' AND weight > 2500
推荐阅读
- Springboot|SpringBoot集成ActiveMQ实例详解
- activeMQ|activeMQ指南针_发布activeMQ的可视化运行维护工具(activeMQ_Spanner)
- ActiveMQ系列—ActiveMQ性能优化(中1)(处理规则和优化)
- NodeJs|基于nodejs和activeMQ的消息推送
- activemq|调试activemq源码
- JMS学习七(ActiveMQ之Topic的持久订阅)