Redis流教程(20分钟)

本文概述

  • 1.简介
  • 2.安装
  • 3.用法
  • 4.总结
1.简介【Redis流教程(20分钟)】Redis 5中有一个关键的新功能:流。
流是日志形式的存储结构, 你可以在其中添加数据。它将为每个数据生成一个时间戳ID。而且流还具有读取数据的便捷模型。
因此, 流适用于消息队列和时间序列存储。
2.安装我们需要使用最新版本的Redis 5.0, 在这里我们使用docker redis容器:
docker run --name redis5 -p 6379:6379 -d redis:5.0-rc3

Redis客户端:
docker run -it --link redis5:redis --rm redis redis-cli -h redis -p 6379

启动后将进入交互式命令行:
redis:6379>

3.用法3.1向流添加元素
流元素可以是一个或多个键值对。让我们向流中添加元素:
redis:6379> XADD mystream * sensor-id 1234 temperature 19.81531989605376-0

从上面我们可以知道:
  • mystream是流的关键;
  • *位置的参数为元素ID, *表示系统自动生成元素ID。
  • 添加的元素包含2个键值对, 传感器ID 1234和温度19.8;
  • 返回的值是新添加的元素的ID, 由时间戳和递增数字组成。
你还可以获取流中的元素数:
redis:6379> XLEN mystream(integer) 1

3.2范围查询
当我们要使用范围查询时, 我们需要指定开始和结束ID, 这等效于指定时间范围:
redis:6379> XRANGE mystream 1531989605376 15319896053771) 1) 1531989605376-0  2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "19.8"

你可以将-用于最小的ID, 而将+用于最大的ID:
redis:6379> XRANGE mystream - +1) 1) 1531989605376-0  2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "19.8"

当返回的元素太多时, 你可以限制返回结果的数量, 就像查询数据库并通过COUNT参数指定它时分页一样:
redis:6379> XRANGE mystream - + COUNT 21) 1) 1531989605376-0  2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "19.8"

你还可以使用XREVRANGE命令来反向查询, 其用法与XRANGE相同。
3.3聆听流的新元素
redis:6379> XREAD COUNT 2 STREAMS mystream 01) 1) "mystream"  2) 1) 1) 1531989605376-0        2) 1) "sensor-id"            2) "1234"            3) "temperature"            4) "19.8"

STREAMS之后的mystream指定目标流的密钥; 0是最小的ID, 我们需要获得大于指定流中指定ID的元素; COUNT是指我们要获取的元素数。
可以一起指定多个流, 例如STREAMS mystream otherstream 0 0。
3.3.1阻止监听器如果你在客户端1中执行以下操作:
redis:6379> XREAD BLOCK 0 STREAMS mystream $

它将进入等待状态。
如果你向客户端2添加元素:
redis:6379> XADD mystream * test 1

刚添加的元素将显示在客户端1中:
1) 1) "mystream"  2) 1) 1) 1531994510562-0        2) 1) "test"            2) "1"

0是指定的超时, 因此0表示从不超时; $表示流中的最大ID。
3.4消费群体
当流的数量非常大时, 或者当使用者处理非常耗时时, 如果只有一个使用者, 它将承受更大的压力。因此, redis流提供了使用者组的概念, 从而允许多个使用者处理同一个流以实现负载平衡。
例如, 如果有3个使用者C1, C2和C3, 并且流中有7个消息元素, 则这些使用者的分配为:
1 -> C12 -> C23 -> C34 -> C15 -> C26 -> C37 -> C1

3.4.1创建消费者组
redis:6379> XGROUP CREATE mystream mygroup01 $OK

这里为流mystream创建了一个消费者组, 该组的名称为mygroup01; $表示读取当前最大ID之后的元素。
3.4.2添加测试数据添加一些新数据:
redis:6379> XADD mystream * message apple1531999977149-0redis:6379> XADD mystream * message orange1531999980272-0redis:6379> XADD mystream * message strawberry1531999983493-0redis:6379> XADD mystream * message apricot1531999988458-0redis:6379> XADD mystream * message banana1531999991782-0

3.4.3通过消费者组读取数据
redis:6379> XREADGROUP GROUP mygroup01 Alice COUNT 1 STREAMS mystream > 1) 1) "mystream"  2) 1) 1) 1531999977149-0        2) 1) "message"            2) "apple"

爱丽丝(Alice)是该组成员的名称, > 表示该??组的成员到目前为止尚未读取数据。
如你所见, 你无需预先创建组成员, 因为它们将在首次使用时自动创建。
然后让我们创建另一个成员以读取数据:
redis:6379> XREADGROUP GROUP mygroup01 Bob COUNT 1 STREAMS mystream > 1) 1) "mystream"  2) 1) 1) 1531999980272-0        2) 1) "message"            2) "orange"

3.4.4消费历史
redis:6379> XREADGROUP GROUP mygroup01 Alice STREAMS mystream 01) 1) "mystream"  2) 1) 1) 1531999977149-0        2) 1) "message"            2) "apple"

此处最后指定的ID为0, 因此你可以获得未决的历史记录数据(已使用但未发送消耗确认的数据), 并且可以帮助恢复后的工作。
3.4.5消费确认
redis:6379> XACK mystream mygroup01 1531999977149-0(integer) 1

1531999977149-0是Alice消耗的苹果数据。现在让我们检查爱丽丝的消费历史记录:
redis:6379> XREADGROUP GROUP mygroup01 Alice STREAMS mystream 01) 1) "mystream"  2) (empty list or set)

它已经是空的。
3.4.6故障处理因此, 当使用者遇到问题时, 你可以获取尚未确认的消息数据, 然后将其恢复。这是一种安全机制, 但是如果不能再恢复有问题的使用者, 该怎么办?有什么方法可以处理尚未确认的消息数据?
redis流中提供了一个解决方案来处理这种情况:
  1. 找出所有已传递但未确认的消息数据;
  2. 更改这些数据的所有者。
现在, 它允许新的使用者处理数据。
列出未处理的数据:
redis:6379> XPENDING mystream mygroup01 - + 101) 1) 1531999980272-0  2) "Bob"  3) (integer) 45126376  4) (integer) 22) 1) 1531999983493-0  2) "Tom"  3) (integer) 867475  4) (integer) 1

你会看到有2个未处理的数据, 并且列出了每个数据的ID, 所有者, 此消息的空闲时间(以毫秒为单位)以及该消息的传递次数。
更改所有者:
redis:6379> XCLAIM mystream mygroup01 Gates 3600 1531999980272-0 1531999983493-01) 1) 1531999980272-0  2) 1) "message"      2) "orange"2) 1) 1531999983493-0  2) 1) "message"      2) "strawberry"

它将指定2个ID的消息传递给Gates。 3600是指最短的空闲时间, 它将空闲时间大于3600的指定消息分配给Gates。请注意, 盖茨是一个新消费者。之前尚未声明过。
检查到目前为止盖茨尚未处理的数据:
redis:6379> XREADGROUP GROUP mygroup01 Gates STREAMS mystream 01) 1) "mystream"  2) 1) 1) 1531999980272-0        2) 1) "message"            2) "orange"      2) 1) 1531999983493-0        2) 1) "message"            2) "strawberry"

有2个新分配的数据。
3.5查看相关信息
查看基本信息:
redis:6379> XINFO STREAM mystream1) length2) (integer) 153) radix-tree-keys4) (integer) 15) radix-tree-nodes6) (integer) 27) groups8) (integer) 29) first-entry10) 1) 1531989605376-0    2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "19.8"11) last-entry12) 1) 1531999991782-0    2) 1) "message"      2) "banana"

查看消费者组信息:
redis:6379> XINFO GROUPS mystream1) 1) name  2) "mygroup"  3) consumers  4) (integer) 3  5) pending  6) (integer) 52) 1) name  2) "mygroup01"  3) consumers  4) (integer) 4  5) pending  6) (integer) 2

查看有关组中消费者的信息:
redis:6379> XINFO CONSUMERS mystream mygroup1) 1) name  2) "Alice"  3) pending  4) (integer) 3  5) idle  6) (integer) 24833882) 1) name  2) "Bob"  3) pending  4) (integer) 2  5) idle  6) (integer) 484537553) 1) name  2) "Gates"  3) pending  4) (integer) 0  5) idle  6) (integer) 2385114

3.7删除消息数据
首先让我们检查现有数据:
redis:6379> XRANGE mystream - + COUNT 21) 1) 1531989605376-0  2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "19.8"2) 1) 1531994510562-0  2) 1) "test"      2) "1"

删除第一条数据:
redis:6379> XDEL mystream 1531989605376-0(integer) 1

再次检查, 你会发现先前的第一个数据不见了:
redis:6379> XRANGE mystream - + COUNT 21) 1) 1531994510562-0  2) 1) "test"      2) "1"2) 1) 1531994516257-0  2) 1) "test"      2) "2"

注意:如果使用XDEL, 它并不会真正从内存中删除数据, 而只是标记数据。它不会回收内存。
3.8设置流的最大长度
添加数据并将最大长度指定为2:
redis:6379> XADD mystream MAXLEN 2 * value 11532049865028-0redis:6379> XADD mystream MAXLEN 2 * value 21532049872075-0redis:6379> XADD mystream MAXLEN 2 * value 31532049877554-0

我们添加了3条数据。现在让我们看一下流的长度和当前内容:
redis:6379> XLEN mystream(integer) 2redis:6379> XRANGE mystream - +1) 1) 1532049872075-0  2) 1) "value"      2) "2"2) 1) 1532049877554-0  2) 1) "value"      2) "3"

你可以看到只有2条数据。
4.总结以上是redis流的基本操作。亲自尝试后, 你将更好地理解流。

    推荐阅读