Golang|Golang NSQ 消息队列使用实战

网上看了好多,都是抄个官网 README,很多重要的东西不说清楚。只好自己研究了一下。
本人博客,关键词 Less-Bug.com ,欢迎关注。
NSQ 的全家桶介绍

  • nsqd:守护进程,客户端通信。默认端口 4150(TCP) 4151(HTTP)
  • nsqlookupd:相当于一个路由器。客户端可以经由它发现生产者、nsqd 广播的话题。一个 nsqlookupd 能够管理一群 nsqd。默认端口::4160(TCP),:4161(HTTP)
  • nsqadmin:在线面板,能够通过浏览器直接访问。默认端口 :4171
从命令行启动 可以直接下载二进制文件。开三个终端,分别执行:
nsqlookupd nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1 nsqadmin --lookupd-http-address=127.0.0.1:4161

go-nsq 的使用 我封装了一个包:
package mqimport ( "encoding/json" "fmt" "time""github.com/nsqio/go-nsq" "go.uber.org/zap" )type MessageQueueConfig struct { NsqAddrstring NsqLookupdAddrstring SupportedTopics []string }type MessageQueue struct { configMessageQueueConfig producer*nsq.Producer consumers map[string]*nsq.Consumer }func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) { zap.L().Debug("New message queue") producer, err := initProducer(config.NsqAddr) if err != nil { return nil, err } consumers := make(map[string]*nsq.Consumer) for _, topic := range config.SupportedTopics { nsq.Register(topic,"default") consumers[topic], err = initConsumer(topic, "default", config.NsqAddr) if err != nil { return } } return &MessageQueue{ config:config, producer:producer, consumers: consumers, }, nil }func (mq *MessageQueue) Run() { for name, c := range mq.consumers { zap.L().Info("Run consumer for " + name) // c.ConnectToNSQLookupd(mq.config.NsqLookupdAddr) c.ConnectToNSQD(mq.config.NsqAddr) } }func initProducer(addr string) (producer *nsq.Producer, err error) { zap.L().Debug("initProducer to " + addr) config := nsq.NewConfig() producer, err = nsq.NewProducer(addr, config) return }func initConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) { zap.L().Debug("initConsumer to " + topic + "/" + channel) config := nsq.NewConfig() config.LookupdPollInterval = 15 * time.Second c, err = nsq.NewConsumer(topic, channel, config) return }func (mq *MessageQueue) Pub(name string, data interface{}) (err error) { body, err := json.Marshal(data) if err != nil { return } zap.L().Info("Pub " + name + " to mq. data = "https://www.it610.com/article/+ string(body)) return mq.producer.Publish(name, body) }type Messagehandler func(v []byte)func (mq *MessageQueue) Sub(name string, handler Messagehandler) (err error) { zap.L().Info("Subscribe " + name) v, ok := mq.consumers[name] if !ok { err = fmt.Errorf("No such topic: " + name) return } v.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { handler(message.Body) return nil })) return }

使用示例:
m, err := mq.NewMessageQueue(mq.MessageQueueConfig{ NsqAddr:"127.0.0.1:4150", NsqLookupdAddr:"127.0.0.1:4161", SupportedTopics: []string{"hello"}, })if err != nil { zap.L().Fatal("Message queue error: " + err.Error()) }m.Sub("hello", func(resp []byte) { zap.L().Info("S1 Got: " + string(resp)) }) m.Sub("hello", func(resp []byte) { zap.L().Info("S2 Got: " + string(resp)) }) m.Run() err = m.Pub("hello", "world") if err != nil { zap.L().Fatal("Message queue error: " + err.Error()) } err = m.Pub("hello", "tom") if err != nil { zap.L().Fatal("Message queue error: " + err.Error()) }sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan os.Exit(0);

主要是进行解耦合,这样万一我们换成 Kalfa 之类的队列,就可以不用动业务代码。
输出结果:
2021-11-07T19:13:41.886+0800DEBUGmq/mq.go:29New message queue 2021-11-07T19:13:41.886+0800DEBUGmq/mq.go:58initProducer to 127.0.0.1:4150 2021-11-07T19:13:41.887+0800DEBUGmq/mq.go:65initConsumer to hello/default 2021-11-07T19:13:41.887+0800INFOmq/mq.go:84Subscribe hello 2021-11-07T19:13:41.887+0800INFOmq/mq.go:84Subscribe hello 2021-11-07T19:13:41.887+0800INFOmq/mq.go:51Run consumer for hello 2021/11/07 19:13:41 INF2 [hello/default] (127.0.0.1:4150) connecting to nsqd 2021-11-07T19:13:41.887+0800INFOmq/mq.go:77Pub hello to mq. data = "https://www.it610.com/article/world" 2021/11/07 19:13:41 INF1 (127.0.0.1:4150) connecting to nsqd 2021-11-07T19:13:41.888+0800INFOmq/mq.go:77Pub hello to mq. data = "https://www.it610.com/article/tom" 2021-11-07T19:13:41.888+0800INFObuqi-admin-backend/main.go:60S1 Got: "world" 2021-11-07T19:13:41.888+0800INFObuqi-admin-backend/main.go:63S2 Got: "tom"

从输出结果我们可以确认一个事实,就是对于订阅了同一个 topic,同一个 channel 的不同消费者,当消息涌入时,将会负载均衡——每个 Handler 只会收到一个消息。
遇到的问题 TOPIC_NOT_FOUND
遇到两个原因。
其一是大小写,Topic 名是大小写敏感的,因此 Hellohello 是两个不同的 topic,写代码时应该规范操作:抽取常量,并维护一个所有 Topic 的列表。
其二是 Topic 未创建。第一次 pub 之后,对应的 topic/channel 才能创建。建议写个脚本调用 /topic/create 接口一次性创建好,不然后面第二次重试订阅的时候才能收到消息,造成不可预料的延迟。
发现客户端轮询 HTTP
这是因为 NsqLookupd 本身是一个中介,可以管理一堆不同 IP 的 nsqd,那么我们就不可能永远只连接一个 nsq,所以就要轮询来确认有哪些客户端。
对于小项目,可以绕过 NsqLookupd:
// c.ConnectToNSQLookupd(mq.config.NsqLookupdAddr) c.ConnectToNSQD(mq.config.NsqAddr)

如何让多个消费者消费同一个 topic?
【Golang|Golang NSQ 消息队列使用实战】显然,根据 nsq 的机制,我们需要让同一个 topic 的消费者使用不同的通道。一种方法是随机化 channel,比如使用一个递增量作为 channel 名。
第二种方法是根据用途定义 channel 名。
第三种方法:据说可以使用 AddConcurrentHandlers,尚未研究。
第四种方法:我们把 Handler 中介化,使用一个消费者去消费,但是手动将消息送入应用层的一个自定义的流水线,让流水线的 filter 去处理消息。我猜这样还能避免一些临界区问题。
我们试一下第四种方法。(代码已发布到 GIST,Github 用户名 Pluveto)
实现流水线 Handler
package mqimport ( "encoding/json" "fmt" "time""github.com/nsqio/go-nsq" "go.uber.org/zap" )type MessageQueueConfig struct { NsqAddrstring NsqLookupdAddrstring EnableLookupdbool SupportedTopics []string }type MessageQueue struct { subscribers map[string]Subscriber configMessageQueueConfig producer*nsq.Producer }type Messagehandler func(v []byte) bool// LinkedHandlerNode 第一个节点为头节点,Handler 必须为 nil type LinkedHandlerNode struct { Handler*Messagehandler Indexint NextNode *LinkedHandlerNode }type Subscriber struct { HandlerHeadNode *LinkedHandlerNode Consumer*nsq.Consumer Handlernsq.HandlerFunc }func createProducer(addr string) (producer *nsq.Producer, err error) { zap.L().Debug("initProducer to " + addr) config := nsq.NewConfig() producer, err = nsq.NewProducer(addr, config) return }func createConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) { zap.L().Debug("initConsumer to " + topic + "/" + channel) config := nsq.NewConfig() config.LookupdPollInterval = 15 * time.Second c, err = nsq.NewConsumer(topic, channel, config) return }func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) { zap.L().Debug("New message queue") producer, err := createProducer(config.NsqAddr) if err != nil { return nil, err } subscribers := make(map[string]Subscriber) for _, topic := range config.SupportedTopics { nsq.Register(topic, "default") consumer, err := createConsumer(topic, "default", config.NsqAddr) if err != nil { return nil, err } // 头节点不参与实际使用,所以 Index = -1 headNode := &LinkedHandlerNode{Index: -1} hubHandler := nsq.HandlerFunc(func(message *nsq.Message) error { // 循环链式调用各个 Handler curNode := headNode.NextNode // 当不存在任何用户定义的 Handler 时抛出警告 if(nil == curNode){ return fmt.Errorf("No handler provided!") } for nil != curNode { msg := message.Body zap.S().Debugf("handler[%v] for %v is invoked", curNode.Index, topic) stop := (*curNode.Handler)(msg) if stop { zap.S().Debugf("the message has stopped spreading ") break } curNode = curNode.NextNode } return nil }) consumer.AddHandler(hubHandler) subscribers[topic] = Subscriber{ Consumer:consumer, HandlerHeadNode: headNode, } } return &MessageQueue{ config:config, producer:producer, subscribers: subscribers, }, nil }func (mq *MessageQueue) Run() { for name, s := range mq.subscribers { zap.L().Info("Run consumer for " + name) if mq.config.EnableLookupd { s.Consumer.ConnectToNSQLookupd(mq.config.NsqLookupdAddr) } else { s.Consumer.ConnectToNSQD(mq.config.NsqAddr) } } }func (mq *MessageQueue) IsTopicSupported(topic string) bool {for _, v := range mq.config.SupportedTopics { if v == topic { return true } } return false }// Pub 向消息队列发送一个消息 func (mq *MessageQueue) Pub(topic string, data interface{}) (err error) { if !mq.IsTopicSupported(topic) { err = fmt.Errorf("unsupported topic name: " + topic) return } body, err := json.Marshal(data) if err != nil { return } zap.L().Info("Pub " + topic + " to mq. data = "https://www.it610.com/article/+ string(body)) return mq.producer.Publish(topic, body) }// Sub 从消息队列订阅一个消息 func (mq *MessageQueue) Sub(topic string, handler Messagehandler) (err error) { if !mq.IsTopicSupported(topic) { err = fmt.Errorf("unsupported topic name: " + topic) return } zap.L().Info("Subscribe " + topic) subscriber, ok := mq.subscribers[topic] if !ok { err = fmt.Errorf("No such topic: " + topic) return } // 抵达最后一个有效链表节点 curNode := subscriber.HandlerHeadNode for nil != curNode.NextNode { curNode = curNode.NextNode } // 创建节点 curNode.NextNode = &LinkedHandlerNode{ Handler:&handler, Index:1 + curNode.Index, NextNode: nil, } return }

这里的思想是给每个消费者预先创建唯一的 Handler,这个 Handler 会依次调用链表中的各个具体的 Handler。当用户订阅 Topic 时,将用户提供的 Handler 添加到链表末尾。
使用示例:
m, err := mq.NewMessageQueue(mq.MessageQueueConfig{ NsqAddr:"127.0.0.1:4150", NsqLookupdAddr:"127.0.0.1:4161", SupportedTopics: []string{"hello"}, EnableLookupd:false, })if err != nil { zap.L().Fatal("Message queue error: " + err.Error()) }m.Sub("hello", func(resp []byte) bool { zap.L().Info("S1 Got: " + string(resp)) return false }) m.Sub("hello", func(resp []byte) bool { zap.L().Info("S2 Got: " + string(resp)) return true }) m.Sub("hello", func(resp []byte) bool { zap.L().Info("S3 Got: " + string(resp)) return false }) m.Run() err = m.Pub("hello", "world") if err != nil { zap.L().Fatal("Message queue error: " + err.Error()) } err = m.Pub("hello", "tom") if err != nil { zap.L().Fatal("Message queue error: " + err.Error()) }sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan os.Exit(0)

输出:
2021-11-07T20:30:38.448+0800DEBUGmq/mq.go:40New message queue 2021-11-07T20:30:38.448+0800DEBUGmq/mq.go:89initProducer to 127.0.0.1:4150 2021-11-07T20:30:38.448+0800DEBUGmq/mq.go:96initConsumer to hello/default 2021-11-07T20:30:38.448+0800INFOmq/mq.go:113Subscribe hello 2021-11-07T20:30:38.448+0800INFOmq/mq.go:113Subscribe hello 2021-11-07T20:30:38.448+0800INFOmq/mq.go:113Subscribe hello 2021-11-07T20:30:38.448+0800INFOmq/mq.go:82Run consumer for hello 2021/11/07 20:30:38 INF2 [hello/default] (127.0.0.1:4150) connecting to nsqd 2021-11-07T20:30:38.454+0800INFOmq/mq.go:108Pub hello to mq. data = "https://www.it610.com/article/world" 2021/11/07 20:30:38 INF1 (127.0.0.1:4150) connecting to nsqd 2021-11-07T20:30:38.455+0800INFOmq/mq.go:108Pub hello to mq. data = "https://www.it610.com/article/tom" 2021-11-07T20:30:38.455+0800DEBUGmq/mq.go:57handler[0] for hello is invoked 2021-11-07T20:30:38.455+0800INFObuqi-admin-backend/main.go:60S1 Got: "world" 2021-11-07T20:30:38.455+0800DEBUGmq/mq.go:57handler[1] for hello is invoked 2021-11-07T20:30:38.455+0800INFObuqi-admin-backend/main.go:64S2 Got: "world" 2021-11-07T20:30:38.455+0800DEBUGmq/mq.go:60the message has stopped spreading 2021-11-07T20:30:38.455+0800DEBUGmq/mq.go:57handler[0] for hello is invoked 2021-11-07T20:30:38.455+0800INFObuqi-admin-backend/main.go:60S1 Got: "tom" 2021-11-07T20:30:38.455+0800DEBUGmq/mq.go:57handler[1] for hello is invoked 2021-11-07T20:30:38.455+0800INFObuqi-admin-backend/main.go:64S2 Got: "tom" 2021-11-07T20:30:38.455+0800DEBUGmq/mq.go:60the message has stopped spreading ^C

可以看到,Handler 返回 true 时,就可以阻断消息的传播。

    推荐阅读