ActiveMQ集群负载配置




1概述 针对海量消息所要求的横向扩展性和系统的高可用性,ActiveMQ提供了网络连接模式的集群功能。
简单的说,就是通过把多个不同的broker实例连接在一起,作为一个整体对外提供服务,从而提高整体对外的消息服务能力。
通过这种方式连接在一起的broker实例之间,可以共享队列和消费者列表,从而达到分布式队列的目的。

2自定义安装 2.1部署结构
Machine1:192.168.1.106
Machine2:192.168.1.107



2.2安装步骤 安装步骤如下:
1、解压;

tar -zxvf apache-activemq-5.9.0-bin.tar.gz



2、相关配置,编辑/conf/activemq.xml配置文件,
(采用staticBroker-Cluster配置方式实现):
可以参考官网:
http://activemq.apache.org/networks-of-brokers.html

1)设置数据回流配置:
节点下新增:

" producerFlowControl="false"memoryLimit="10mb" enableAudit="false">




2)在Machine1上设置对Machine2的引用
配置多个用逗号隔开host

or










3) 在Machine2上设置对Machine1的引用












2、启动:
执行/bin目录下activemq
分别启动Machine1和Machine2上的activemq

./activemq start &





3、查看启动后的日志:

如下日志显示,集群间Cluster相互连接,表示集群BrokerCluster集群配置成功。

ActiveMQ集群负载配置
文章图片


ActiveMQ集群负载配置
文章图片




2.3属性说明 2.3.1 networkConnector配置参数





URI的几个属性:
属性
默认值
描述
initialReconnectDelay
1000
重连之前等待的时间(ms) (如果useExponentialBackOff为 false)
maxReconnectDelay
30000
重连之前等待的最大时间(ms)
useExponentialBackOff
true
每次重连失败时是否增大等待时间
backOffMultiplier
2
增大等待时间的系数


networkConnector参数属性

属性
默认值
描述
name
bridge
名称
dynamicOnly
false
如果为true,持久订阅被激活时才创建对应的网路持久订阅。默认是启动时激活。
decreaseNetworkConsumerPriority
false
如果为true,网络的消费者优先级降低为-5。如果为false,则默认跟本地消费者一样为0.
networkTTL
1
消息和订阅在网络上通过的broker数量
conduitSubscriptions
true
多个网络消费者是否被当做一个消费者来对待。
excludedDestinations
empty
不通过网络转发的destination
dynamicallyIncludedDestinations
empty
通过网络转发的destinations,注意空列表代表所有的都转发。
staticallyIncludedDestinations
empty
匹配的都将通过网络转发-即使没有对应的消费者
duplex
false
如果为true,则既可消费又可生产消息到网络broker
prefetchSize
1000
设置网络消费者的prefetch size参数。必须大于0,因为网络消费者不能自己轮询消息。
suppressDuplicateQueueSubscriptions
false
(从5.3版本开始) 如果为true,重复的订阅关系一产生即被阻止。
bridgeTempDestinations
true
是否广播advisory messages来创建临时destination。
alwaysSyncSend
false
(从 5.6版本开始) 如果为true,非持久化消息也将使用request/reply方式代替oneway方式发送到远程broker。
staticBridge
false
(从5.6版本开始) 如果为true,只有staticallyIncludedDestinations中配置的destination可以被处理。


2.4场景说明 一个很有意思的场景是,broker1和broker2通过networkConnector连接。一些个consumers连接到broker1,消费broker2上的消息。消息先被broker1从broker2上消费掉,然后转发给这些consumers。不幸的是转发部分消息的时候broker1重启了,这些consumers发现broker1连接失败,通过failover连接到broker2上去了,但是有一部分他们还没有消费的消息被broker2已经分发到了broker1上去了。这些消息,就好像是消失了,除非有消费者重新连接到broker1上来消费。怎么办呢?
办法就是从5.6版本destinationPolicy上新增的选项replayWhenNoConsumers。这个选项使得broker1上有需要转发的消息但是没有消费者时,把消息回流到它原始的broker。同时把enableAudit设置为false,为了防止消息回流后被当做重复消息而不被分发。

3Cluster原理 networkConnector的实现原理是基于ActiveMQ的公告消息(AdvisoryMessage)机制的(参见此处)。当broker2通过networkConnector、duplex方式指向broker1时,发生了什么呢?
假定broker1已经启动,这时候broker2开始启动。
1.broker2先启动自己的connector
2.然后使用一个vm的connector,创建一个connection,把自己作为一个client,连接到broker1。
3.通过订阅AdvisoryMessage,拿到相互的Consumer与相应的Queue列表。
至此,双方建立关系。

4集群场景测试 4.1集群测试 1、使用java客户端发送队列数据,

1)在生产者上连接61616发送数据;
2)在消费者上连接61617进行接收数据;


Product代码:

public class Producer { private static final String BROKER_URL = "tcp://192.168.1.106:61616"; private static final Boolean NON_TRANSACTED = false; private static final int NUM_MESSAGES_TO_SEND = 100; private static final long DELAY = 100; public static void main(String[] args) { String url = BROKER_URL; if (args.length > 0) { url = args[0].trim(); } ActiveMQConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory("admin", "admin", url); Connection connection = null; try { connection = connectionFactory.createConnection(); connection.start(); Session session =connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE); Destination destination =session.createQueue("test-queue"); MessageProducer producer =session.createProducer(destination); for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { TextMessage message =session.createTextMessage("Message#" + i); System.out.println("Sending message #" + i); producer.send(message); Thread.sleep(DELAY); } producer.close(); session.close(); } catch (Exception e) { System.out.println("Caught exception!"); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { System.out.println("Could not close an open connection..."); } } } } }





Consumer代码:

public class Consumer { // private static final String BROKER_URL ="tcp://192.168.1.106:61616"; private static final String BROKER_URL = "tcp://192.168.1.107:61617"; private static final Boolean NON_TRANSACTED = false; private static final long TIMEOUT = 20000; public static void main(String[] args) { String url = BROKER_URL; if (args.length > 0) { url = args[0].trim(); } System.out.println("\nWaiting to receive messages... will timeout after " + TIMEOUT / 1000 +"s"); ActiveMQConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory("admin", "admin", url); Connection connection = null; try { connection = connectionFactory.createConnection(); connection.start(); Session session =connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE); Destination destination =session.createQueue("test-queue"); MessageConsumer consumer =session.createConsumer(destination); int i = 0; while (true) { Message message =consumer.receive(TIMEOUT); if (message != null) { if (message instanceof TextMessage) { String text =((TextMessage) message).getText(); System.out.println("Got " + i++ + ". message: " + text); } } else { break; } } consumer.close(); session.close(); } catch (Exception e) { System.out.println("Caught exception!"); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { System.out.println("Could not close an open connection..."); } } } } }





2、发送队列消息后,查看activeMQ网页信息:
ActiveMQ集群负载配置
文章图片






可以通过网页查看到已经存储了队列信息:



3、通过消费者取集群中另外一台机上的队列数据,可以取到
ActiveMQ集群负载配置
文章图片



至此,ActiveMQ集群配置完成

【ActiveMQ集群负载配置】

    推荐阅读