Redis实时发布/订阅

本文概述

  • 我们需要的东西
  • Redis发布-订阅模式
  • 实时且可扩展
  • 幕后花絮
  • 概述
  • 本文总结
扩展Web应用程序几乎总是一个有趣的挑战, 无论涉及的复杂性如何。但是, 实时Web应用程序会带来独特的可伸缩性问题。例如, 为了能够水平扩展使用WebSocket与其客户端通信的消息传递Web应用程序, 它将需要以某种方式同步其所有服务器节点。如果没有考虑到这一点, 那么将其水平缩放可能不是一个容易的选择。
在本文中, 我们将介绍一个简单的实时图像共享和消息传递Web应用程序的体系结构。在这里, 我们将重点关注构建实时应用程序所涉及的各种组件, 例如Redis Pub/Sub, 并了解它们在整体架构中如何发挥作用。
Redis实时发布/订阅

文章图片
使用Redis Pub/Sub实时进行
鸣叫
在功能方面, 该应用程序非常轻巧。它允许上传图像和对这些图像的实时注释。此外, 任何用户都可以点击图像, 其他用户将可以在屏幕上看到涟漪效果。
该应用程序的完整源代码可在GitHub上找到。
我们需要的东西 Go
我们将使用编程语言Go。除了Go的语法简洁而且其语义更易于理解之外, 我们在本文中选择Go并没有特殊的原因。当然, 还有作者的偏见。但是, 本文讨论的所有概念都可以轻松转换为你选择的语言。
Go入门很容易。可以从官方站点下载其二进制发行版。如果你使用的是Windows, 则在其下载页面上有用于Go的MSI安装程序。或者, 如果你的操作系统提供了程序包管理器, 则:
Arch Linux:
pacman -S go

Ubuntu:
apt-get install golang

Mac OS X:
brew install go

仅当我们安装了Homebrew时, 此功能才有效。
MongoDB
你问, 如果我们有Redis, 为什么要使用MongoDB?如前所述, Redis是内存数据存储。尽管可以将数据持久保存到磁盘, 但使用Redis可能不是最好的方法。我们将使用MongoDB来存储上传的图像元数据和消息。
我们可以从其官方网站下载MongoDB。在某些Linux发行版中, 这是安装MongoDB的首选方式。尽管如此, 仍然应该可以使用大多数发行版的软件包管理器进行安装。
Arch Linux:
pacman -S mongodb

Ubuntu:
apt-get install mongodb

Mac OS X:
brew install mongodb

在我们的Go代码中, 我们将使用软件包mgo(发音为mango)。驱动程序包不仅经过了实战测试, 还提供了一个非常干净简单的API。
如果你不是MongoDB专家, 请不要担心。在我们的示例应用程序中, 此数据库服务的使用最少, 并且与本文的重点(发布/订阅体系结构)几乎无关。
亚马逊S3
我们将使用Amazon S3存储用户上传的图像。除了确保我们拥有一个准备好Amazon Web Services的帐户并创建一个临时存储桶之外, 这里没有什么可做的。
将上传的文件存储到本地磁盘不是一种选择, 因为我们不想以任何方式依赖我们的Web节点的身份。我们希望用户能够连接到任何可用的Web节点, 并且仍然能够看到相同的内容。
为了与我们的Go代码中的Amazon S3存储桶进行交互, 我们将使用AdRoll/goamz, 这是Canonical的goamz软件包的分支, 但有所不同。
雷迪斯
最后但并非最不重要:Redis。我们可以使用我们发行版的软件包管理器进行安装:
Arch Linux:
pacman -S redis

Ubuntu:
apt-get install redis-server

Mac OS X:
brew install redis

或者, 获取其源代码并自行编译。 Redis除了GCC和libc之外没有其他依赖项来构建它:
wget http://download.redis.io/redis-stable.tar.gz tar xvzf redis-stable.tar.gz cd redis-stable make

Redis安装并运行后, 启动终端并输入Redis的CLI:
redis-cli

尝试输入以下命令, 看看是否获得预期的输出:
SET answer 41 INCR answer GET answer

第一个命令针对键” answer” 存储” 41″ , 第二个命令对值进行递增, 第三个命令针对给定的键打印存储的值。结果应显示为” 42″ 。
你可以在Redis官方网站上了解有关Redis支持的所有命令的更多信息。
我们将使用Go包redigo从我们的应用程序代码中连接到Redis。
Redis发布-订阅模式 发布-订阅模式是一种将消息传递到任意数量的发件人的方式。这些消息的发件人(发布者)没有明确标识目标收件人。而是将消息发送到任何数量的收件人(订阅者)可以在其上等待它们的通道上。
Redis实时发布/订阅

文章图片
在我们的情况下, 我们可以在负载均衡器后面运行任意数量的Web节点。在任何给定时刻, 两个看同一图像的用户可能不会连接到同一节点。这是Redis Pub/Sub发挥作用的地方。每当Web节点需要观察到更改时(例如, 用户创建了一条新消息), 它将使用Redis Pub/Sub将信息广播到所有相关的Web节点。依次将信息传播到相关客户端, 以便它们可以获取messageredis的更新列表。
由于发布-订阅模式允许我们在命名频道上发送消息, 因此我们可以将每个Web节点连接到Redis, 并仅订阅其连接的用户感兴趣的那些频道。例如, 如果两个用户都在查看相同的图像, 但连接到许多Web节点中的两个不同的Web节点, 则只有这两个Web节点需要订阅相应的频道。在该通道上发布的任何消息将仅传递到这两个Web节点。
【Redis实时发布/订阅】听起来好得令人难以置信?我们可以使用Redis的CLI进行尝试。启动三个redis-cli实例。在第一个实例中执行以下命令:
SUBSCRIBE somechannel

在第二个Redis CLI实例中执行以下命令:
SUBSCRIBE someotherchannel

在Redis CLI的第三个实例中执行以下命令:
PUBLISH somechannel lorem PUBLISH someotherchannel ipsum

注意第一个实例如何接收” lorem” 而不是” ipsum” , 第二个实例如何接收” ipsum” 而不是” lorem” 。
Redis实时发布/订阅

文章图片
值得一提的是, Redis客户端一旦进入订阅者模式, 便无法执行任何操作, 只能订阅更多频道或取消订阅频道。这意味着每个Web节点将需要维持与Redis的两个连接, 一个要作为订户连接到Redis, 另一个要在通道上发布消息, 以便订阅这些通道的任何Web节点都可以接收它们。
实时且可扩展 在开始探究幕后发生的事情之前, 让我们克隆存储库:
mkdir tonesa cd tonesa export GOPATH=`pwd` mkdir -p src/github.com/hjr265/tonesa cd src/github.com/hjr265/tonesa git clone https://github.com/hjr265/tonesa.git . go get ./...

…并编译:
go build ./cmd/tonesad

要运行该应用程序, 首先创建一个名为.env的文件(最好通过复制文件env-sample.txt):
cp env-sample.txt .env

使用所有必需的环境变量填写.env文件:
MONGO_URL=mongodb://127.0.0.1/tonesa REDIS_URL=redis://127.0.0.1 AWS_ACCESS_KEY_ID={Your-AWS-Access-Key-ID-Goes-Here} AWS_SECRET_ACCESS_KEY={And-Your-AWS-Secret-Access-Key} S3_BUCKET_NAME={And-S3-Bucket-Name}

最后运行构建的二进制文件:
PORT=9091 ./tonesad -env-file=.env

Web节点现在应该正在运行, 并且可以通过http:// localhost:9091访问。
Redis实时发布/订阅

文章图片
要测试它在水平缩放时是否仍然有效, 可以通过使用不同的端口号启动它来旋转多个Web节点:
PORT=9092 ./tonesad -env-file=.env

PORT=9093 ./tonesad -env-file=.env

…并通过相应的URL访问它们:http:// localhost:9092和http:// localhost:9093。
Redis实时发布/订阅

文章图片
幕后花絮 我们将不着眼于应用程序开发的每个步骤, 而将重点放在一些最重要的部分上。尽管并非所有这些都与Redis Pub/Sub及其实时含义有关, 但它们仍与应用程序的整体结构有关, 并且一旦我们深入研究, 它们就变得更容易跟进了。
为简单起见, 我们不会打扰用户身份验证。上载将是匿名的, 并且所有知道该URL的人都可以使用。所有查看者都可以发送消息, 并且可以选择自己的别名。调整适当的身份验证机制和隐私功能应该是微不足道的, 这不在本文的讨论范围之内。
持久数据
这很容易。
每当用户上传图片时, 我们都会将其存储在Amazon S3中, 然后使用两个ID将其路径存储在MongoDB中:一个BSON对象ID(MongoDB的最爱), 另一个短8字符长的ID(有点令人赏心悦目)。这进入我们数据库的” 上载” 集合, 其结构如下:
type Upload struct { IDbson.ObjectId `bson:"_id"` ShortID string`bson:"shortID"` Kind Kind `bson:"kind"` Content Blob `bson:"content"` CreatedAttime.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` } type Blob struct { Path string `bson:"path"` Size int64`bson:"size"` }

字段Kind用于指示此” 上载” 包含的媒体类型。这是否意味着我们支持图像以外的媒体?抱歉不行。但是该字段留在那里提醒我们, 我们不必局限于此处的图像。
当用户彼此发送消息时, 它们存储在不同的集合中。是的, 你已经猜到了:” 消息” 。
type Message struct { ID bson.ObjectId `bson:"_id"` UploadID bson.ObjectId `bson:"uploadID"` AuthorName string `bson:"anonName"` Contentstring `bson:"content"` CreatedAttime.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` }

这里唯一有趣的位是UploadID字段, 该字段用于将消息关联到特定的上载。
API端点
该应用程序实质上具有三个端点。
POST/API /上传 该终结点的处理程序希望提交” 多部分/表单数据” , 并且图像在” 文件” 字段中。处理程序的行为大致如下:
func HandleUploadCreate(w http.ResponseWriter, r *http.Request) { f, h, _ := r.FormFile("file") b := bytes.Buffer{} n, _ := io.Copy(& b, io.LimitReader(f, data.MaxUploadContentSize+10)) if n > data.MaxUploadContentSize { ServeBadRequest(w, r) return } id := bson.NewObjectId() upl := data.Upload{ ID:id, Kind: data.Image, Content: data.Blob{ Path: "/uploads/" + id.Hex(), Size: n, }, } data.Bucket.Put(upl.Content.Path, b.Bytes(), h.Header.Get("Content-Type"), s3.Private, s3.Options{}) upl.Put() // Respond with newly created upload entity (JSON encoded) }

Go要求明确处理所有错误。该操作已在原型中完成, 但在本文的摘录中省略了, 以使重点放在关键部分上。
在此API端点的处理程序中, 我们实质上是在读取文件, 但将其大小限制为特定值。如果上载超过此值, 则拒绝该请求。否则, 将生成BSON ID, 并将其用于在将上传实体持久保存到MongoDB之前将图像上传到Amazon S3。
BSON对象ID的生成方式有其优点和缺点。它们在客户端上生成。但是, 用于生成对象ID的策略使冲突的可能性极小, 因此可以在客户端安全地生成它们。另一方面, 生成的对象ID的值通常是连续的, 这是Amazon S3不太喜欢的。一个简单的解决方法是在文件名前添加一个随机字符串。
GET/api/uploads/{id}/messages 此API用于获取最近的消息以及在特定时间之后发布的消息。
func ServeMessageList(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } sinceStr := r.URL.Query().Get("since") var msgs []data.Message if sinceStr != "" { since, _ := time.Parse(time.RFC3339, sinceStr)msgs, _ = data.ListMessagesByUploadID(upl.ID, since, 16) } else { msgs, _ = data.ListRecentMessagesByUploadID(upl.ID, 16) } // Respond with message entities (JSON encoded) }

当用户的浏览器收到关于用户当前正在查看的上载中的新消息的通知时, 它将使用此端点来获取新消息。
POST/api/uploads/{id} /消息 最后, 创建消息并通知所有人的处理程序:
func HandleMessageCreate(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } body := Message{} json.NewDecoder(r.Body).Decode(& body) msg := data.Message{} msg.UploadID = upl.ID msg.AuthorName = body.AuthorName msg.Content = body.Content msg.Put() // Respond with newly created message entity (JSON encoded) hub.Emit("upload:"+upl.ID.Hex(), "message:"+msg.ID.Hex()) }

此处理程序与其他处理程序非常相似, 以至于甚至在这里都包含它也很无聊。还是?注意, 在函数的末尾有一个函数调用hub.Emit()。你说的是枢纽?那就是所有Pub/Sub魔术发生的地方。
集线器:WebSocket与Redis相遇的地方
Hub是我们将WebSocket与Redis的发布/订阅渠道粘合在一起的地方。而且, 巧合的是, 我们用来在Web服务器中处理WebSocket的软件包称为胶水。
集线器本质上维护着一些数据结构, 这些数据结构在所有连接的WebSocket到他们感兴趣的所有通道之间创建映射。例如, 用户浏览器选项卡上的WebSocket指向特定的上载图像自然应该对所有相关通知感兴趣对它。
集线器软件包实现了六个功能:
  • 订阅
  • 全部取消订阅
  • 发射
  • EmitLocal
  • InitHub
  • 手柄插口
全部订阅和取消订阅
func Subscribe(s *glue.Socket, t string) error { l.Lock() defer l.Unlock() _, ok := sockets[s] if !ok { sockets[s] = map[string]bool{} } sockets[s][t] = true _, ok = topics[t] if !ok { topics[t] = map[*glue.Socket]bool{} err := subconn.Subscribe(t) if err != nil { return err } } topics[t][s] = true return nil }

与该软件包中的大多数其他函数一样, 该函数在执行时会在读/写互斥锁上保持锁定。这样, 我们就可以安全地修改原始数据结构变量套接字和主题。第一个变量, 套接字, 将套接字映射到通道名称, 而第二个主题, 主题, 将通道名称映射到套接字。在此功能中, 我们构建了这些映射。每当我们看到套接字订阅一个新的频道名称时, 我们就会使用subconn.Subscribe来建立我们的Redis连接subconn, 并在Redis上订阅该频道。这使Redis将该通道上的所有通知转发到此Web节点。
同样, 在UnsubscribeAll函数中, 我们拆解了映射:
func UnsubscribeAll(s *glue.Socket) error { l.Lock() defer l.Unlock() for t := range sockets[s] { delete(topics[t], s) if len(topics[t]) == 0 { delete(topics, t) err := subconn.Unsubscribe(t) if err != nil { return err } } } delete(sockets, s) return nil }

当我们从特定通道感兴趣的数据结构中删除最后一个套接字时, 我们使用subconn.Unsubscribe从Redis中的通道取消订阅。
发射
func Emit(t string, m string) error { _, err := pubconn.Do("PUBLISH", t, m) return err }

此功能使用到Redis的发布连接在通道t上发布消息m。
EmitLocal
func EmitLocal(t string, m string) { l.RLock() defer l.RUnlock() for s := range topics[t] { s.Write(m) } }

InitHub
func InitHub(url string) error { c, _ := redis.DialURL(url) pubconn = c c, _ = redis.DialURL(url) subconn = redis.PubSubConn{c} go func() { for { switch v := subconn.Receive().(type) { case redis.Message: EmitLocal(v.Channel, string(v.Data))case error: panic(v) } } }() return nil }

在InitHub函数中, 我们将创建两个与Redis的连接:一个用于订阅该Web节点感兴趣的频道, 另一个用于发布消息。建立连接后, 我们将启动一个新的Go例程, 该例程将永远运行一个循环, 等待通过与Redis的订户连接接收消息。每次收到消息时, 它都会在本地发出消息(即发送到连接到该Web节点的所有WebSocket)。
手柄插口 最后, HandleSocket是我们等待消息通过WebSockets到达或在连接关闭后清除的地方:
func HandleSocket(s *glue.Socket) { s.OnClose(func() { UnsubscribeAll(s) }) s.OnRead(func(data string) { fields := strings.Fields(data) if len(fields) == 0 { return } switch fields[0] { case "watch": if len(fields) != 2 { return } Subscribe(s, fields[1])case "touch": if len(fields) != 4 { return } Emit(fields[1], "touch:"+fields[2]+", "+fields[3]) } }) }

前端JavaScript
由于胶水带有其自己的前端JavaScript库, 因此处理WebSockets更加容易(或者在WebSockets不可用时回退到XHR轮询):
var socket = glue() socket.onMessage(function(data) { data = http://www.srcmini.com/data.split(':') switch(data[0]) { case 'message': messages.fetch({ data: { since: _.first(messages.pluck('createdAt')) || '' }, add: true, remove: false }) breakcase 'touch': var coords = data[1].split(', ') showTouchBubble(coords) break } }) socket.send('watch upload:'+upload.id)

在客户端, 我们正在侦听通过WebSocket传入的任何消息。由于胶水将所有消息作为字符串传输, 因此我们使用特定的模式对其中的所有信息进行编码:
  • 新消息:” 消息:{messageID}”
  • 点击图片:” 触摸:{coordX}, {coordY}” , 其中coordX和coordY是用户在图片上点击位置的百分比坐标
当用户创建新消息时, 我们使用” POST/api/uploads/{uploadID}/messages” API来创建新消息。这是使用骨干集合上的消息的create方法完成的:
messages.create({ authorName: $messageAuthorNameEl.val(), content: $messageContentEl.val(), createdAt: '' }, { at: 0 })

当用户点击图片时, 我们以图片的宽度和高度的百分比计算点击位置, 然后直接通过WebSocket发送信息。
socket.send('touch upload:'+upload.id+' '+(event.pageX - offset.left)/$contentImgEl.width()+' '+(event.pageY - offset.top)/$contentImgEl.height())

概述
Redis实时发布/订阅

文章图片
当用户键入消息并按Enter键时, 客户端将调用” POST/api/uploads/{id}/messages” API端点。反过来, 这会在数据库中创建消息实体, 并通过集线器软件包通过Redis Pub/Sub在通道” upload:{uploadID}” 上发布字符串” message:{messageID}” 。
Redis将该字符串转发给对通道” upload:{uploadID}” 感兴趣的每个Web节点(订户)。接收到该字符串的Web节点将遍历与该通道相关的所有WebSocket, 并通过其WebSocket连接将字符串发送给客户端。接收此字符串的客户端开始使用” GET/api/uploads/{id}/messages” 从服务器获取新消息。
类似地, 为了在图像上传播点击事件, 客户端直接通过WebSocket发送一条消息, 该消息看起来类似于” 触摸上载:{uploadID} {coordX} {coordY}” 。该消息最终在集线器包中结束, 在该集线器包中, 它在相同的通道” upload:{uploadID}” 上发布。结果, 该字符串将分发给查看上载图像的所有用户。客户端在收到此字符串后会对其进行解析以提取坐标, 并绘制一个逐渐变淡的圆圈以突出显示单击位置。
本文总结 在本文中, 我们看到了” 发布-订阅” 模式如何可以在很大程度上相对轻松地帮助解决实时Web应用程序扩展问题。
该示例应用程序可作为试验Redis Pub/Sub的游乐场。但是, 如前所述, 这些思想几乎可以用任何其他流行的编程语言来实现。

    推荐阅读