于今腐草无萤火,终古垂杨有暮鸦。这篇文章主要讲述最通俗易懂的Redis发布订阅及代码实战相关的知识,希望能为你提供帮助。
发布订阅简介
除了使用List实现简单的消息队列功能以外,Redis还提供了发布订阅的消息机制。在这种机制下,消息发布者向指定频道(channel)发布消息,消息订阅者可以收到指定频道的消息,同一个频道可以有多个消息订阅者,如下图:
文章图片
Redis也提供了一些命令支持这个机制,接下来我们详细介绍一下这些命令。
发布订阅相关命令
在Redis中,发布订阅相关命令有:
- 发布消息
- 订阅频道
- 取消订阅
- 按照模式订阅
- 按照模式取消订阅
- 查询订阅信息
publish
,语法是:publish 频道名称 消息
比如,要向channel:one-more-study:demo频道发布一条消息“I am One More Study.”,命令如下:
>
publish channel:one-more-study:demo "I am One More Study."
(integer) 0
返回的结果是订阅者的个数,上例中没有订阅者,所以返回结果为0。
订阅消息订阅消息的命令是
subscribe
,订阅者可以订阅一个或者多个频道,语法是:subscribe 频道名称 [频道名称 ...]
比如,订阅一个channel:one-more-study:demo频道,命令如下:
>
subscribe channel:one-more-study:demo
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:one-more-study:demo"
3) (integer) 1
返回结果中有3条,分别表示:返回值的类型(订阅成功)、订阅的频道名称、目前已订阅的频道数量。当订阅者接受到消息时,就会显示:
1) "message"
2) "channel:one-more-study:demo"
3) "I am One More Study."
同样也是3条结果,分别表示:返回值的类型(信息)、消息来源的频道名称、消息内容。
新开启的订阅者,是无法收到该频道之前的历史消息的,因为Redis没有对发布的消息做持久化。
取消订阅取消订阅的命令是
unsubscribe
,可以取消一个或者多个频道的订阅,语法是:unsubscribe [频道名称 [频道名称 ...]]
比如,取消订阅channel:one-more-study:demo频道,命令如下:
>
unsubscribe channel:one-more-study:demo
1) "unsubscribe"
2) "channel:one-more-study:demo"
3) (integer) 0
返回结果中有3条,分别表示:返回值的类型(取消订阅成功)、取消订阅的频道名称、目前已订阅的频道数量。
按模式订阅消息按模式订阅消息的命令是
psubscribe
,订阅一个或多个符合给定模式的频道,语法是:psubscribe 模式 [模式 ...]
每个模式以 作为匹配符,比如 channel 匹配所有以 channel 开头的频道,命令如下:
>
psubscribe channel:*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "channel*"
3) (integer) 1
返回结果中有3条,分别表示:返回值的类型(按模式订阅成功)、订阅的模式、目前已订阅的模式数量。当订阅者接受到消息时,就会显示:
1) "pmessage"
2) "channel*"
3) "channel:one-more-study:demo"
4) "I am One More Study."
返回结果中有4条,分别表示:返回值的类型(信息)、消息匹配的模式、消息来源的频道名称、消息内容。
按模式取消订阅按模式取消订阅的命令是
punsubscribe
,可以取消一个或者多个模式的订阅,语法是:punsubscribe [模式 [模式 ...]]
每个模式以 作为匹配符,比如 channel: 匹配所有以 channel 开头的频道,命令如下:
1>
punsubscribe channel:*
1) "punsubscribe"
2) "channel:*"
3) (integer) 0
返回结果中有3条,分别表示:返回值的类型(按模式取消订阅成功)、取消订阅的模式、目前已订阅的模式数量。
查询订阅信息 查看活跃频道活跃频道指的是至少有一个订阅者的频道,语法是:
pubsub channels [模式]
比如:
>
pubsub channels
1) "channel:one-more-study:test"
2) "channel:one-more-study:demo"
3) "channel:demo"
>
pubsub channels *demo
1) "channel:one-more-study:demo"
2) "channel:demo"
>
pubsub channels *one-more-study*
1) "channel:one-more-study:test"
2) "channel:one-more-study:demo"
查看频道订阅数
pubsub numsub [频道名称 ...]
比如:
>
pubsub numsub channel:one-more-study:demo
1) "channel:one-more-study:demo"
2) (integer) 1
查看模式订阅数
>
pubsub numpat
(integer) 1
代码实战
光说不练假把式,我们使用java语言写一个简单的发布订阅示例。
Jedis集群示例Jedis是Redis官方推荐的Java连接开发工具,我们使用Jedis写一个简单的集群示例。
package onemore.study;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;
import java.util.HashSet;
import java.util.Set;
/**
* Jedis集群
*
* @author 万猫学社
*/
public enum Cluster
INSTANCE;
//为了简单,把IP和端口直接写在这里,实际开发中写在配置文件会更好。
private final String hostAndPorts = "192.168.0.60:6379;
192.168.0.61:6379;
192.168.0.62:6379";
private JedisCluster jedisCluster;
Cluster()
JedisPoolConfig poolConfig = new JedisPoolConfig();
//最大连接数
poolConfig.setMaxTotal(20);
//最大空闲数
poolConfig.setMaxIdle(10);
//最小空闲数
poolConfig.setMinIdle(2);
//从jedis连接池获取连接时,校验并返回可用的连接
poolConfig.setTestOnBorrow(true);
//把连接放回jedis连接池时,校验并返回可用的连接
poolConfig.setTestOnReturn(true);
Set<
HostAndPort>
nodes = new HashSet<
>
();
String[] hosts = hostAndPorts.split(";
");
for (String hostport : hosts)
String[] ipport = hostport.split(":");
String ip = ipport[0];
int port = Integer.parseInt(ipport[1]);
nodes.add(new HostAndPort(ip, port));
jedisCluster = new JedisCluster(nodes, 1000, poolConfig);
public JedisCluster getJedisCluster()
return jedisCluster;
发布者示例
package onemore.study;
import redis.clients.jedis.JedisCluster;
/**
* 发布者
*
* @author 万猫学社
*/
public class Publisher implements Runnable
private final String CHANNEL_NAME = "channel:one-more-study:demo";
private final String QUIT_COMMAND = "quit";
@Override
public void run()
JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster();
for (int i = 1;
i <
= 3;
i++)
String message = "第" + i + "消息";
System.out.println(Thread.currentThread().getName() + " 发布:" + message);
jedisCluster.publish(CHANNEL_NAME, message);
try
Thread.sleep(1000);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("------------------");
jedisCluster.publish(CHANNEL_NAME, QUIT_COMMAND);
订阅者示例
package onemore.study;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPubSub;
/**
* 订阅者
*
* @author 万猫学社
*/
public class Subscriber implements Runnable
private final String CHANNEL_NAME = "channel:one-more-study:demo";
private final String QUIT_COMMAND = "quit";
private final JedisPubSub jedisPubSub = new JedisPubSub()
@Override
public void onMessage(String channel, String message)
System.out.println(Thread.currentThread().getName() + " 接收:" + message);
if (QUIT_COMMAND.equals(message))
unsubscribe(CHANNEL_NAME);
;
@Override
public void run()
JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster();
jedisCluster.subscribe(jedisPubSub, CHANNEL_NAME);
综合示例
package onemore.study;
public class App
public static void main(String[] args) throws InterruptedException
//创建3个订阅者
new Thread(new Subscriber()).start();
new Thread(new Subscriber()).start();
new Thread(new Subscriber()).start();
Thread.sleep(1000);
//创建发布者
new Thread(new Publisher()).start();
【最通俗易懂的Redis发布订阅及代码实战】运行结果如下:
Thread-6 发布:第1消息
Thread-0 接收:第1消息
Thread-1 接收:第1消息
Thread-2 接收:第1消息
------------------
Thread-6 发布:第2消息
Thread-0 接收:第2消息
Thread-1 接收:第2消息
Thread-2 接收:第2消息
------------------
Thread-6 发布:第3消息
Thread-0 接收:第3消息
Thread-2 接收:第3消息
Thread-1 接收:第3消息
------------------
Thread-0 接收:quit
Thread-1 接收:quit
Thread-2 接收:quit
推荐阅读
- RT-Thread快速入门-内核移植
- JAVA线程详解
- PHP 零基础入门笔记(安装PHP)
- #yyds干货盘点#CyclicBarrier详解
- Kubernetes 1.15 安装及组件关系(前期准备工作篇)
- 网络安全最全面试题
- 对功能测试的一些思考
- MySQL千万数据方案调研,一不小心直接打挂我系统
- Openharmony3.1编译Hi3516DV300标准版系统体验及img档分享