SpringBoot|SpringBoot Redis 发布订阅模式(Pub/Sub)的具体使用
目录
- Redis命令行下使用发布订阅
- publish 发布
- subscribe 订阅
- SpringBoot中使用Redis的发布订阅功能
- 发布者
- 订阅者
- 消息监听容器
Redis命令行下使用发布订阅
publish 发布
发布者通过以下命令可以往指定channel发布message
redis> publish channel message
subscribe 订阅
订阅者通过以下命令可以订阅一个或多个频道,如果频道不存在则会创建
redis> subscribe channel [channel ...]
对于redis的发布订阅的命令就这么简单。那么接下来我们在springboot中如何使用发布订阅的功能呢?
SpringBoot中使用Redis的发布订阅功能
添加依赖配置redis信息和连接池什么的就不说了,如果添加的有commons-pool2依赖的话,会自动帮我们配置redis连接池的
发布者
相对于订阅者来说,发布者的实现方式很简单,以下方式就可以往channel中发送message了。
@Resourceprivate RedisTemplate redisTemplate; public void publish(){// 使用高级的redisTemplateredisTemplate.convertAndSend("channel","message"); // 使用低级的connection 实际上redisTemplate的底层就是使用的下面的方式redisTemplate.execute(new RedisCallback
订阅者
订阅者因为涉及到连接、线程等 所以内容相对会多一点
@Resourceprivate RedisTemplate redisTemplate; public void subscribe() {redisTemplate.execute(new RedisCallback
如何取消订阅呢?从刚才的map里取到连接
RedisConnection the = map.get("connection"); Subscription subscription = the.getSubscription(); subscription.unsubscribe();
消息监听容器
上面的那种订阅为低级订阅,由于连接在调用subscribe的时候会导致当前线程阻塞,这种方式需要对每个监听器连接和线程管理,所以spring提供了RedisMessageListenerContainer类来帮我们完成这些工作。
RedisMessageListenerContainer顾名思义可以知道它是一个消息监听容器
详情请参考官方文档
如何实现
@Configurationpublic class DefaultMessageListenerContainerConfig {@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory factory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(factory); // 官方推荐我们使用自定义的线程池或者使用TaskExecutorcontainer.setTaskExecutor(executor()); container.addMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message, byte[] pattern) {System.out.println(Thread.currentThread().getName() + ": " + new String(message.getBody())); }}, new ChannelTopic("message")); return container; }@Beanpublic TaskExecutor executor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2); executor.setQueueCapacity(100); executor.initialize(); return executor; }}
这个时候我们在redis命令行内使用 publish channel message 的时候,我们的spring程序就可以订阅到消息了。
再说下 MessageListenerAdapter
我们可以通过 MessageListenerAdapter 消息接收者包装进去,消息接收者不会和redis有任何耦合。
官方文档给了spring传统的xml的方式配置的,下面我给出基于configuration配置的代码
public interface MessageDelegate {void handleMessage(String message); }public class DefaultMessageDelegate implements MessageDelegate {@Overridepublic void handleMessage(String message) {System.out.println(message); }}@Configurationpublic class MessageListenerContainerConfig {@Autowiredprivate DefaultMessageDelegate defaultMessageDelegate; @Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory factory,MessageListenerAdapter messageListenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(factory); container.setTaskExecutor(executor()); Map> map = new HashMap<>(); List channelTopics = new ArrayList<>(); ChannelTopic channelTopic = new ChannelTopic("message"); channelTopics.add(channelTopic); map.put(messageListenerAdapter, channelTopics); container.setMessageListeners(map); return container; }@Beanpublic TaskExecutor executor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2); executor.setQueueCapacity(100); executor.initialize(); return executor; }@Beanpublic MessageListenerAdapter messageListenerAdapter() {// handleMessage 参数消息来的时候要调用的方法 默认是 handleMessagereturn new MessageListenerAdapter(defaultMessageDelegate, "handleMessage"); }}
如果我们要在程序运行时添加订阅或者取消订阅的时候该怎么办呢?
我们需要提前准备好消息侦听器,添加的时候把侦听器注入到消息容器
取消的时候就调用消息容器的remove方法把侦听器删除掉即可。
【SpringBoot|SpringBoot Redis 发布订阅模式(Pub/Sub)的具体使用】到此这篇关于SpringBoot Redis 发布订阅模式(Pub/Sub)的具体使用的文章就介绍到这了,更多相关SpringBoot Redis发布订阅模式内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
推荐阅读
- Activiti(一)SpringBoot2集成Activiti6
- SpringBoot调用公共模块的自定义注解失效的解决
- 解决SpringBoot引用别的模块无法注入的问题
- 图文小编《杨浦、成毅》为你发布!无价之宝随意摆放的公园
- springboot使用redis缓存
- 用npm发布一个包的教程并编写一个vue的插件发布
- 【译】Rails|【译】Rails 5.0正式发布(Action Cable,API模式等)
- (1)redis集群原理及搭建与使用(1)
- springboot整合数据库连接池-->druid
- 运用flutter|运用flutter 构建一个发布版(release)APK