本文概述
- 1.简介
- 2.安装
- 3.用法
- 4.总结
流是日志形式的存储结构, 你可以在其中添加数据。它将为每个数据生成一个时间戳ID。而且流还具有读取数据的便捷模型。
因此, 流适用于消息队列和时间序列存储。
2.安装我们需要使用最新版本的Redis 5.0, 在这里我们使用docker redis容器:
docker run --name redis5 -p 6379:6379 -d redis:5.0-rc3
Redis客户端:
docker run -it --link redis5:redis --rm redis redis-cli -h redis -p 6379
启动后将进入交互式命令行:
redis:6379>
3.用法3.1向流添加元素
流元素可以是一个或多个键值对。让我们向流中添加元素:
redis:6379>
XADD mystream * sensor-id 1234 temperature 19.81531989605376-0
从上面我们可以知道:
- mystream是流的关键;
- *位置的参数为元素ID, *表示系统自动生成元素ID。
- 添加的元素包含2个键值对, 传感器ID 1234和温度19.8;
- 返回的值是新添加的元素的ID, 由时间戳和递增数字组成。
redis:6379>
XLEN mystream(integer) 1
3.2范围查询
当我们要使用范围查询时, 我们需要指定开始和结束ID, 这等效于指定时间范围:
redis:6379>
XRANGE mystream 1531989605376 15319896053771) 1) 1531989605376-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
你可以将-用于最小的ID, 而将+用于最大的ID:
redis:6379>
XRANGE mystream - +1) 1) 1531989605376-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
当返回的元素太多时, 你可以限制返回结果的数量, 就像查询数据库并通过COUNT参数指定它时分页一样:
redis:6379>
XRANGE mystream - + COUNT 21) 1) 1531989605376-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
你还可以使用XREVRANGE命令来反向查询, 其用法与XRANGE相同。
3.3聆听流的新元素
redis:6379>
XREAD COUNT 2 STREAMS mystream 01) 1) "mystream"
2) 1) 1) 1531989605376-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
STREAMS之后的mystream指定目标流的密钥; 0是最小的ID, 我们需要获得大于指定流中指定ID的元素; COUNT是指我们要获取的元素数。
可以一起指定多个流, 例如STREAMS mystream otherstream 0 0。
3.3.1阻止监听器如果你在客户端1中执行以下操作:
redis:6379>
XREAD BLOCK 0 STREAMS mystream $
它将进入等待状态。
如果你向客户端2添加元素:
redis:6379>
XADD mystream * test 1
刚添加的元素将显示在客户端1中:
1) 1) "mystream"
2) 1) 1) 1531994510562-0
2) 1) "test"
2) "1"
0是指定的超时, 因此0表示从不超时; $表示流中的最大ID。
3.4消费群体
当流的数量非常大时, 或者当使用者处理非常耗时时, 如果只有一个使用者, 它将承受更大的压力。因此, redis流提供了使用者组的概念, 从而允许多个使用者处理同一个流以实现负载平衡。
例如, 如果有3个使用者C1, C2和C3, 并且流中有7个消息元素, 则这些使用者的分配为:
1 ->
C12 ->
C23 ->
C34 ->
C15 ->
C26 ->
C37 ->
C1
3.4.1创建消费者组
redis:6379>
XGROUP CREATE mystream mygroup01 $OK
这里为流mystream创建了一个消费者组, 该组的名称为mygroup01; $表示读取当前最大ID之后的元素。
3.4.2添加测试数据添加一些新数据:
redis:6379>
XADD mystream * message apple1531999977149-0redis:6379>
XADD mystream * message orange1531999980272-0redis:6379>
XADD mystream * message strawberry1531999983493-0redis:6379>
XADD mystream * message apricot1531999988458-0redis:6379>
XADD mystream * message banana1531999991782-0
3.4.3通过消费者组读取数据
redis:6379>
XREADGROUP GROUP mygroup01 Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1531999977149-0
2) 1) "message"
2) "apple"
爱丽丝(Alice)是该组成员的名称, > 表示该??组的成员到目前为止尚未读取数据。
如你所见, 你无需预先创建组成员, 因为它们将在首次使用时自动创建。
然后让我们创建另一个成员以读取数据:
redis:6379>
XREADGROUP GROUP mygroup01 Bob COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1531999980272-0
2) 1) "message"
2) "orange"
3.4.4消费历史
redis:6379>
XREADGROUP GROUP mygroup01 Alice STREAMS mystream 01) 1) "mystream"
2) 1) 1) 1531999977149-0
2) 1) "message"
2) "apple"
此处最后指定的ID为0, 因此你可以获得未决的历史记录数据(已使用但未发送消耗确认的数据), 并且可以帮助恢复后的工作。
3.4.5消费确认
redis:6379>
XACK mystream mygroup01 1531999977149-0(integer) 1
1531999977149-0是Alice消耗的苹果数据。现在让我们检查爱丽丝的消费历史记录:
redis:6379>
XREADGROUP GROUP mygroup01 Alice STREAMS mystream 01) 1) "mystream"
2) (empty list or set)
它已经是空的。
3.4.6故障处理因此, 当使用者遇到问题时, 你可以获取尚未确认的消息数据, 然后将其恢复。这是一种安全机制, 但是如果不能再恢复有问题的使用者, 该怎么办?有什么方法可以处理尚未确认的消息数据?
redis流中提供了一个解决方案来处理这种情况:
- 找出所有已传递但未确认的消息数据;
- 更改这些数据的所有者。
列出未处理的数据:
redis:6379>
XPENDING mystream mygroup01 - + 101) 1) 1531999980272-0
2) "Bob"
3) (integer) 45126376
4) (integer) 22) 1) 1531999983493-0
2) "Tom"
3) (integer) 867475
4) (integer) 1
你会看到有2个未处理的数据, 并且列出了每个数据的ID, 所有者, 此消息的空闲时间(以毫秒为单位)以及该消息的传递次数。
更改所有者:
redis:6379>
XCLAIM mystream mygroup01 Gates 3600 1531999980272-0 1531999983493-01) 1) 1531999980272-0
2) 1) "message"
2) "orange"2) 1) 1531999983493-0
2) 1) "message"
2) "strawberry"
它将指定2个ID的消息传递给Gates。 3600是指最短的空闲时间, 它将空闲时间大于3600的指定消息分配给Gates。请注意, 盖茨是一个新消费者。之前尚未声明过。
检查到目前为止盖茨尚未处理的数据:
redis:6379>
XREADGROUP GROUP mygroup01 Gates STREAMS mystream 01) 1) "mystream"
2) 1) 1) 1531999980272-0
2) 1) "message"
2) "orange"
2) 1) 1531999983493-0
2) 1) "message"
2) "strawberry"
有2个新分配的数据。
3.5查看相关信息
查看基本信息:
redis:6379>
XINFO STREAM mystream1) length2) (integer) 153) radix-tree-keys4) (integer) 15) radix-tree-nodes6) (integer) 27) groups8) (integer) 29) first-entry10) 1) 1531989605376-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"11) last-entry12) 1) 1531999991782-0
2) 1) "message"
2) "banana"
查看消费者组信息:
redis:6379>
XINFO GROUPS mystream1) 1) name
2) "mygroup"
3) consumers
4) (integer) 3
5) pending
6) (integer) 52) 1) name
2) "mygroup01"
3) consumers
4) (integer) 4
5) pending
6) (integer) 2
查看有关组中消费者的信息:
redis:6379>
XINFO CONSUMERS mystream mygroup1) 1) name
2) "Alice"
3) pending
4) (integer) 3
5) idle
6) (integer) 24833882) 1) name
2) "Bob"
3) pending
4) (integer) 2
5) idle
6) (integer) 484537553) 1) name
2) "Gates"
3) pending
4) (integer) 0
5) idle
6) (integer) 2385114
3.7删除消息数据
首先让我们检查现有数据:
redis:6379>
XRANGE mystream - + COUNT 21) 1) 1531989605376-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"2) 1) 1531994510562-0
2) 1) "test"
2) "1"
删除第一条数据:
redis:6379>
XDEL mystream 1531989605376-0(integer) 1
再次检查, 你会发现先前的第一个数据不见了:
redis:6379>
XRANGE mystream - + COUNT 21) 1) 1531994510562-0
2) 1) "test"
2) "1"2) 1) 1531994516257-0
2) 1) "test"
2) "2"
注意:如果使用XDEL, 它并不会真正从内存中删除数据, 而只是标记数据。它不会回收内存。3.8设置流的最大长度
添加数据并将最大长度指定为2:
redis:6379>
XADD mystream MAXLEN 2 * value 11532049865028-0redis:6379>
XADD mystream MAXLEN 2 * value 21532049872075-0redis:6379>
XADD mystream MAXLEN 2 * value 31532049877554-0
我们添加了3条数据。现在让我们看一下流的长度和当前内容:
redis:6379>
XLEN mystream(integer) 2redis:6379>
XRANGE mystream - +1) 1) 1532049872075-0
2) 1) "value"
2) "2"2) 1) 1532049877554-0
2) 1) "value"
2) "3"
你可以看到只有2条数据。
4.总结以上是redis流的基本操作。亲自尝试后, 你将更好地理解流。
推荐阅读
- 深入了解Spark内存管理模型
- Android手动回收bitmap,引发Canvas: trying to use a recycled bitmap处理
- app调用webapi时候出现读取不到session的问题
- Android破解学习之路——Android游戏 滚动的天空破解
- spring JdbcTemplate 查询,使用BeanPropertyRowMapper
- .Net语言 APP开发平台——Smobiler学习日志(在应用中添加WeiXin组件)
- android studio第一次使用时的界面设置
- Android百分比布局成功导入及简单使用
- Android之AbsoluteLayout(绝对布局)