最通俗易懂的Redis发布订阅及代码实战

于今腐草无萤火,终古垂杨有暮鸦。这篇文章主要讲述最通俗易懂的Redis发布订阅及代码实战相关的知识,希望能为你提供帮助。
发布订阅简介
除了使用List实现简单的消息队列功能以外,Redis还提供了发布订阅的消息机制。在这种机制下,消息发布者向指定频道(channel)发布消息,消息订阅者可以收到指定频道的消息,同一个频道可以有多个消息订阅者,如下图:

最通俗易懂的Redis发布订阅及代码实战

文章图片

Redis也提供了一些命令支持这个机制,接下来我们详细介绍一下这些命令。
发布订阅相关命令
在Redis中,发布订阅相关命令有:
  1. 发布消息
  2. 订阅频道
  3. 取消订阅
  4. 按照模式订阅
  5. 按照模式取消订阅
  6. 查询订阅信息
发布消息发布消息的命令是publish,语法是:
publish 频道名称 消息

比如,要向channel:one-more-study:demo频道发布一条消息“I am One More Study.”,命令如下:
> publish channel:one-more-study:demo "I am One More Study." (integer) 0

返回的结果是订阅者的个数,上例中没有订阅者,所以返回结果为0。
订阅消息订阅消息的命令是subscribe,订阅者可以订阅一个或者多个频道,语法是:
subscribe 频道名称 [频道名称 ...]

比如,订阅一个channel:one-more-study:demo频道,命令如下:
> subscribe channel:one-more-study:demo Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "channel:one-more-study:demo" 3) (integer) 1

返回结果中有3条,分别表示:返回值的类型(订阅成功)、订阅的频道名称、目前已订阅的频道数量。当订阅者接受到消息时,就会显示:
1) "message" 2) "channel:one-more-study:demo" 3) "I am One More Study."

同样也是3条结果,分别表示:返回值的类型(信息)、消息来源的频道名称、消息内容。
新开启的订阅者,是无法收到该频道之前的历史消息的,因为Redis没有对发布的消息做持久化。
取消订阅取消订阅的命令是unsubscribe,可以取消一个或者多个频道的订阅,语法是:
unsubscribe [频道名称 [频道名称 ...]]

比如,取消订阅channel:one-more-study:demo频道,命令如下:
> unsubscribe channel:one-more-study:demo 1) "unsubscribe" 2) "channel:one-more-study:demo" 3) (integer) 0

返回结果中有3条,分别表示:返回值的类型(取消订阅成功)、取消订阅的频道名称、目前已订阅的频道数量。
按模式订阅消息按模式订阅消息的命令是psubscribe,订阅一个或多个符合给定模式的频道,语法是:
psubscribe 模式 [模式 ...]

每个模式以 作为匹配符,比如 channel 匹配所有以 channel 开头的频道,命令如下:
> psubscribe channel:* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "channel*" 3) (integer) 1

返回结果中有3条,分别表示:返回值的类型(按模式订阅成功)、订阅的模式、目前已订阅的模式数量。当订阅者接受到消息时,就会显示:
1) "pmessage" 2) "channel*" 3) "channel:one-more-study:demo" 4) "I am One More Study."

返回结果中有4条,分别表示:返回值的类型(信息)、消息匹配的模式、消息来源的频道名称、消息内容。
按模式取消订阅按模式取消订阅的命令是punsubscribe,可以取消一个或者多个模式的订阅,语法是:
punsubscribe [模式 [模式 ...]]

每个模式以 作为匹配符,比如 channel: 匹配所有以 channel 开头的频道,命令如下:
1> punsubscribe channel:* 1) "punsubscribe" 2) "channel:*" 3) (integer) 0

返回结果中有3条,分别表示:返回值的类型(按模式取消订阅成功)、取消订阅的模式、目前已订阅的模式数量。
查询订阅信息 查看活跃频道活跃频道指的是至少有一个订阅者的频道,语法是:
pubsub channels [模式]

比如:
> pubsub channels 1) "channel:one-more-study:test" 2) "channel:one-more-study:demo" 3) "channel:demo" > pubsub channels *demo 1) "channel:one-more-study:demo" 2) "channel:demo" > pubsub channels *one-more-study* 1) "channel:one-more-study:test" 2) "channel:one-more-study:demo"

查看频道订阅数
pubsub numsub [频道名称 ...]

比如:
> pubsub numsub channel:one-more-study:demo 1) "channel:one-more-study:demo" 2) (integer) 1

查看模式订阅数
> pubsub numpat (integer) 1

代码实战
光说不练假把式,我们使用java语言写一个简单的发布订阅示例。
Jedis集群示例Jedis是Redis官方推荐的Java连接开发工具,我们使用Jedis写一个简单的集群示例。
package onemore.study; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPoolConfig; import java.util.HashSet; import java.util.Set; /** * Jedis集群 * * @author 万猫学社 */ public enum Cluster INSTANCE; //为了简单,把IP和端口直接写在这里,实际开发中写在配置文件会更好。 private final String hostAndPorts = "192.168.0.60:6379; 192.168.0.61:6379; 192.168.0.62:6379"; private JedisCluster jedisCluster; Cluster() JedisPoolConfig poolConfig = new JedisPoolConfig(); //最大连接数 poolConfig.setMaxTotal(20); //最大空闲数 poolConfig.setMaxIdle(10); //最小空闲数 poolConfig.setMinIdle(2); //从jedis连接池获取连接时,校验并返回可用的连接 poolConfig.setTestOnBorrow(true); //把连接放回jedis连接池时,校验并返回可用的连接 poolConfig.setTestOnReturn(true); Set< HostAndPort> nodes = new HashSet< > (); String[] hosts = hostAndPorts.split("; "); for (String hostport : hosts) String[] ipport = hostport.split(":"); String ip = ipport[0]; int port = Integer.parseInt(ipport[1]); nodes.add(new HostAndPort(ip, port)); jedisCluster = new JedisCluster(nodes, 1000, poolConfig); public JedisCluster getJedisCluster() return jedisCluster;

发布者示例
package onemore.study; import redis.clients.jedis.JedisCluster; /** * 发布者 * * @author 万猫学社 */ public class Publisher implements Runnable private final String CHANNEL_NAME = "channel:one-more-study:demo"; private final String QUIT_COMMAND = "quit"; @Override public void run() JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster(); for (int i = 1; i < = 3; i++) String message = "第" + i + "消息"; System.out.println(Thread.currentThread().getName() + " 发布:" + message); jedisCluster.publish(CHANNEL_NAME, message); try Thread.sleep(1000); catch (InterruptedException e) e.printStackTrace(); System.out.println("------------------"); jedisCluster.publish(CHANNEL_NAME, QUIT_COMMAND);

订阅者示例
package onemore.study; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPubSub; /** * 订阅者 * * @author 万猫学社 */ public class Subscriber implements Runnable private final String CHANNEL_NAME = "channel:one-more-study:demo"; private final String QUIT_COMMAND = "quit"; private final JedisPubSub jedisPubSub = new JedisPubSub() @Override public void onMessage(String channel, String message) System.out.println(Thread.currentThread().getName() + " 接收:" + message); if (QUIT_COMMAND.equals(message)) unsubscribe(CHANNEL_NAME); ; @Override public void run() JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster(); jedisCluster.subscribe(jedisPubSub, CHANNEL_NAME);

综合示例
package onemore.study; public class App public static void main(String[] args) throws InterruptedException //创建3个订阅者 new Thread(new Subscriber()).start(); new Thread(new Subscriber()).start(); new Thread(new Subscriber()).start(); Thread.sleep(1000); //创建发布者 new Thread(new Publisher()).start();

【最通俗易懂的Redis发布订阅及代码实战】运行结果如下:
Thread-6 发布:第1消息 Thread-0 接收:第1消息 Thread-1 接收:第1消息 Thread-2 接收:第1消息 ------------------ Thread-6 发布:第2消息 Thread-0 接收:第2消息 Thread-1 接收:第2消息 Thread-2 接收:第2消息 ------------------ Thread-6 发布:第3消息 Thread-0 接收:第3消息 Thread-2 接收:第3消息 Thread-1 接收:第3消息 ------------------ Thread-0 接收:quit Thread-1 接收:quit Thread-2 接收:quit


    推荐阅读