文章对SpringRedis和ActiveMQ的源码及使用简要分析一下,希望能起到抛砖引玉的作用。笔者之前在博客中写了一篇关于生产消费者模式的一个工程使用案例,10W级数据更新操作__生产消费者模式。其实发布订阅模式与生产消费者只是两种模式上的不同,在业务处理逻辑上还是比较相似的。前者数据结构是一对多队列模式,后者是点对点队列模式。Spring-Redis
Spring-Redis的队列监听容器为
org.springframework.data.redis.listener.RedisMessageListenerContainer
,继承关系如下。 文章图片
- 如何注入的监听?
注册监听对象和Topic保存在Map中,注入的set方法创建MessageListener-Topic映射,决定哪个Topic由哪个MessageListener去处理。一个Listener可对应多个Topic.
文章图片
- Topic如何创建
Topic是个接口,有两个实现类ChannelTopic和PatternTopic,一般Redis的发布订阅模式用ChannelTopic,只需要创建ChannelTopic注入到上述的Map即可。
文章图片
查看ChannelTopic的源码,发现没有set方法注入channelName,只有一个构造器方法。因此需要通过构造器方法注入这点要尤其注意。
文章图片
- MessageListener的创建和注入
org.springframework.data.redis.connection.MessageListener接口的实现类如下,显然这不是一个标准的JMS协议接口实现:
文章图片
注意:两个序列化方法要与Redis发布消息所用的序列化对象一致,下文会讲到。
delegate处理消息对象
文章图片
- 为什么delegate属性需要实现handleMessage(Object[] obj)方法?
反射调用的地方如下
文章图片
invoker实例初始化
文章图片
文章图片
文章图片
注意:delegate也可以实现MessageListener接口,实现onMessage方法处理消息。
- 序列化方法
关于在订阅消息在哪里将Message反序列化成Object,可以看调用serializer属性的源码。
文章图片
convertAndSend发布时channel的序列化是采用stringSerializer,该属性在RedisTemplate中已经初始化new了一个对象,因此默认不用注入该属性。
文章图片
关于RedisTemplate 的一些属性
文章图片
message(Value)的序列化是采用valueSerivalizer,默认为null,需要入注入。也可以注入defaultSerializer,也可以不用注入,RedisTemplate在Spring初始化时已经new JdkSerializationRedisSerializer方法了。所以最好保证在订阅方也采用这种方法序列化。
文章图片
- SpringXml配置使用Redis发布订阅
ActiveMQ
ActiveMQ消息队列有两种模式,点对点队列和一对多队列即发布订阅,ActiveMQ是一种实现了标准的JMS协议的中间件。
- 消息订阅
常用订阅监听容器类为org.springframework.jms.listener.DefaultMessageListenerContainer
它实现的接口和继承的类关系如下图
文章图片
DefaultMessageListenerContainer容器主要维护一些关于消息队列的配置和处理线程,消息处理类和序列化主要在其超类org.springframework.jms.listener.AbstractMessageListenerContainer
中进行定义。主要有两个参数destination和messageListener,前者是JMS目的地也可以理解为信息的来源(队列的实例),后者是处理消息的类,这两个参数分别需要实现javax.jms.Topic/javax.jms.Queue
接口、javax.jms.MessageListener/org.springframework.jms.listener.SessionAwareMessageListener
接口。
文章图片
- 在ActiveMQ中,队列的JMS接口都已经有实现,所以大部分情况直接拿来就用,注入队列的名字即可。
Topic的实现类为org.apache.activemq.command.ActiveMQTopic
;
Queue的实现类为org.apache.activemq.command.ActiveMQQueue
;
javax.jms.MessageListener
或org.springframework.jms.listener.SessionAwareMessageListener
作为回调接口,则需要自己根据具体的业务逻辑去实现。当然也可选择其中的一个实现类org.springframework.jms.listener.adapter.MessageListenerAdapter
去实现自己的业务逻辑,这个类可参考上述Redis发布订阅模式,MessageListenerAdapter中有诸多默认的属性如下,不过这个使用起来比较麻烦,需要设置destinationName或其它,不建议使用。 文章图片
- 消息发布
org.springframework.jms.core.JmsTemplate
的send方法。配置如下两个方法即可连接到ActiveMQ注册地址。
文章图片
- JmsTemplate的继承实现关系
文章图片
- 【Spring Redis与ActiveMQ发布订阅模式源码分析】ActiveMQ的SpringXml配置使用