Golang|Golang NSQ 消息队列使用实战
网上看了好多,都是抄个官网 README,很多重要的东西不说清楚。只好自己研究了一下。
本人博客,关键词 Less-Bug.com ,欢迎关注。
NSQ 的全家桶介绍
- nsqd:守护进程,客户端通信。默认端口
4150
(TCP)4151
(HTTP) - nsqlookupd:相当于一个路由器。客户端可以经由它发现生产者、nsqd 广播的话题。一个 nsqlookupd 能够管理一群 nsqd。默认端口:
:4160
(TCP),:4161
(HTTP) - nsqadmin:在线面板,能够通过浏览器直接访问。默认端口
:4171
nsqlookupd
nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1
nsqadmin --lookupd-http-address=127.0.0.1:4161
go-nsq 的使用 我封装了一个包:
package mqimport (
"encoding/json"
"fmt"
"time""github.com/nsqio/go-nsq"
"go.uber.org/zap"
)type MessageQueueConfig struct {
NsqAddrstring
NsqLookupdAddrstring
SupportedTopics []string
}type MessageQueue struct {
configMessageQueueConfig
producer*nsq.Producer
consumers map[string]*nsq.Consumer
}func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) {
zap.L().Debug("New message queue")
producer, err := initProducer(config.NsqAddr)
if err != nil {
return nil, err
}
consumers := make(map[string]*nsq.Consumer)
for _, topic := range config.SupportedTopics {
nsq.Register(topic,"default")
consumers[topic], err = initConsumer(topic, "default", config.NsqAddr)
if err != nil {
return
}
}
return &MessageQueue{
config:config,
producer:producer,
consumers: consumers,
}, nil
}func (mq *MessageQueue) Run() {
for name, c := range mq.consumers {
zap.L().Info("Run consumer for " + name)
// c.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)
c.ConnectToNSQD(mq.config.NsqAddr)
}
}func initProducer(addr string) (producer *nsq.Producer, err error) {
zap.L().Debug("initProducer to " + addr)
config := nsq.NewConfig()
producer, err = nsq.NewProducer(addr, config)
return
}func initConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) {
zap.L().Debug("initConsumer to " + topic + "/" + channel)
config := nsq.NewConfig()
config.LookupdPollInterval = 15 * time.Second
c, err = nsq.NewConsumer(topic, channel, config)
return
}func (mq *MessageQueue) Pub(name string, data interface{}) (err error) {
body, err := json.Marshal(data)
if err != nil {
return
}
zap.L().Info("Pub " + name + " to mq. data = "https://www.it610.com/article/+ string(body))
return mq.producer.Publish(name, body)
}type Messagehandler func(v []byte)func (mq *MessageQueue) Sub(name string, handler Messagehandler) (err error) {
zap.L().Info("Subscribe " + name)
v, ok := mq.consumers[name]
if !ok {
err = fmt.Errorf("No such topic: " + name)
return
}
v.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
handler(message.Body)
return nil
}))
return
}
使用示例:
m, err := mq.NewMessageQueue(mq.MessageQueueConfig{
NsqAddr:"127.0.0.1:4150",
NsqLookupdAddr:"127.0.0.1:4161",
SupportedTopics: []string{"hello"},
})if err != nil {
zap.L().Fatal("Message queue error: " + err.Error())
}m.Sub("hello", func(resp []byte) {
zap.L().Info("S1 Got: " + string(resp))
})
m.Sub("hello", func(resp []byte) {
zap.L().Info("S2 Got: " + string(resp))
})
m.Run()
err = m.Pub("hello", "world")
if err != nil {
zap.L().Fatal("Message queue error: " + err.Error())
}
err = m.Pub("hello", "tom")
if err != nil {
zap.L().Fatal("Message queue error: " + err.Error())
}sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
os.Exit(0);
主要是进行解耦合,这样万一我们换成 Kalfa 之类的队列,就可以不用动业务代码。
输出结果:
2021-11-07T19:13:41.886+0800DEBUGmq/mq.go:29New message queue
2021-11-07T19:13:41.886+0800DEBUGmq/mq.go:58initProducer to 127.0.0.1:4150
2021-11-07T19:13:41.887+0800DEBUGmq/mq.go:65initConsumer to hello/default
2021-11-07T19:13:41.887+0800INFOmq/mq.go:84Subscribe hello
2021-11-07T19:13:41.887+0800INFOmq/mq.go:84Subscribe hello
2021-11-07T19:13:41.887+0800INFOmq/mq.go:51Run consumer for hello
2021/11/07 19:13:41 INF2 [hello/default] (127.0.0.1:4150) connecting to nsqd
2021-11-07T19:13:41.887+0800INFOmq/mq.go:77Pub hello to mq. data = "https://www.it610.com/article/world"
2021/11/07 19:13:41 INF1 (127.0.0.1:4150) connecting to nsqd
2021-11-07T19:13:41.888+0800INFOmq/mq.go:77Pub hello to mq. data = "https://www.it610.com/article/tom"
2021-11-07T19:13:41.888+0800INFObuqi-admin-backend/main.go:60S1 Got: "world"
2021-11-07T19:13:41.888+0800INFObuqi-admin-backend/main.go:63S2 Got: "tom"
从输出结果我们可以确认一个事实,就是对于订阅了同一个 topic,同一个 channel 的不同消费者,当消息涌入时,将会负载均衡——每个 Handler 只会收到一个消息。
遇到的问题 TOPIC_NOT_FOUND
遇到两个原因。
其一是大小写,Topic 名是大小写敏感的,因此
Hello
和 hello
是两个不同的 topic,写代码时应该规范操作:抽取常量,并维护一个所有 Topic 的列表。其二是 Topic 未创建。第一次 pub 之后,对应的 topic/channel 才能创建。建议写个脚本调用
/topic/create
接口一次性创建好,不然后面第二次重试订阅的时候才能收到消息,造成不可预料的延迟。发现客户端轮询 HTTP
这是因为 NsqLookupd 本身是一个中介,可以管理一堆不同 IP 的 nsqd,那么我们就不可能永远只连接一个 nsq,所以就要轮询来确认有哪些客户端。
对于小项目,可以绕过 NsqLookupd:
// c.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)
c.ConnectToNSQD(mq.config.NsqAddr)
如何让多个消费者消费同一个 topic?
【Golang|Golang NSQ 消息队列使用实战】显然,根据 nsq 的机制,我们需要让同一个 topic 的消费者使用不同的通道。一种方法是随机化 channel,比如使用一个递增量作为 channel 名。
第二种方法是根据用途定义 channel 名。
第三种方法:据说可以使用 AddConcurrentHandlers,尚未研究。
第四种方法:我们把 Handler 中介化,使用一个消费者去消费,但是手动将消息送入应用层的一个自定义的流水线,让流水线的 filter 去处理消息。我猜这样还能避免一些临界区问题。
我们试一下第四种方法。(代码已发布到 GIST,Github 用户名 Pluveto)
实现流水线 Handler
package mqimport (
"encoding/json"
"fmt"
"time""github.com/nsqio/go-nsq"
"go.uber.org/zap"
)type MessageQueueConfig struct {
NsqAddrstring
NsqLookupdAddrstring
EnableLookupdbool
SupportedTopics []string
}type MessageQueue struct {
subscribers map[string]Subscriber
configMessageQueueConfig
producer*nsq.Producer
}type Messagehandler func(v []byte) bool// LinkedHandlerNode 第一个节点为头节点,Handler 必须为 nil
type LinkedHandlerNode struct {
Handler*Messagehandler
Indexint
NextNode *LinkedHandlerNode
}type Subscriber struct {
HandlerHeadNode *LinkedHandlerNode
Consumer*nsq.Consumer
Handlernsq.HandlerFunc
}func createProducer(addr string) (producer *nsq.Producer, err error) {
zap.L().Debug("initProducer to " + addr)
config := nsq.NewConfig()
producer, err = nsq.NewProducer(addr, config)
return
}func createConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) {
zap.L().Debug("initConsumer to " + topic + "/" + channel)
config := nsq.NewConfig()
config.LookupdPollInterval = 15 * time.Second
c, err = nsq.NewConsumer(topic, channel, config)
return
}func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) {
zap.L().Debug("New message queue")
producer, err := createProducer(config.NsqAddr)
if err != nil {
return nil, err
}
subscribers := make(map[string]Subscriber)
for _, topic := range config.SupportedTopics {
nsq.Register(topic, "default")
consumer, err := createConsumer(topic, "default", config.NsqAddr)
if err != nil {
return nil, err
}
// 头节点不参与实际使用,所以 Index = -1
headNode := &LinkedHandlerNode{Index: -1}
hubHandler := nsq.HandlerFunc(func(message *nsq.Message) error {
// 循环链式调用各个 Handler
curNode := headNode.NextNode
// 当不存在任何用户定义的 Handler 时抛出警告
if(nil == curNode){
return fmt.Errorf("No handler provided!")
}
for nil != curNode {
msg := message.Body
zap.S().Debugf("handler[%v] for %v is invoked", curNode.Index, topic)
stop := (*curNode.Handler)(msg)
if stop {
zap.S().Debugf("the message has stopped spreading ")
break
}
curNode = curNode.NextNode
}
return nil
})
consumer.AddHandler(hubHandler)
subscribers[topic] = Subscriber{
Consumer:consumer,
HandlerHeadNode: headNode,
}
}
return &MessageQueue{
config:config,
producer:producer,
subscribers: subscribers,
}, nil
}func (mq *MessageQueue) Run() {
for name, s := range mq.subscribers {
zap.L().Info("Run consumer for " + name)
if mq.config.EnableLookupd {
s.Consumer.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)
} else {
s.Consumer.ConnectToNSQD(mq.config.NsqAddr)
}
}
}func (mq *MessageQueue) IsTopicSupported(topic string) bool {for _, v := range mq.config.SupportedTopics {
if v == topic {
return true
}
}
return false
}// Pub 向消息队列发送一个消息
func (mq *MessageQueue) Pub(topic string, data interface{}) (err error) {
if !mq.IsTopicSupported(topic) {
err = fmt.Errorf("unsupported topic name: " + topic)
return
}
body, err := json.Marshal(data)
if err != nil {
return
}
zap.L().Info("Pub " + topic + " to mq. data = "https://www.it610.com/article/+ string(body))
return mq.producer.Publish(topic, body)
}// Sub 从消息队列订阅一个消息
func (mq *MessageQueue) Sub(topic string, handler Messagehandler) (err error) {
if !mq.IsTopicSupported(topic) {
err = fmt.Errorf("unsupported topic name: " + topic)
return
}
zap.L().Info("Subscribe " + topic)
subscriber, ok := mq.subscribers[topic]
if !ok {
err = fmt.Errorf("No such topic: " + topic)
return
}
// 抵达最后一个有效链表节点
curNode := subscriber.HandlerHeadNode
for nil != curNode.NextNode {
curNode = curNode.NextNode
}
// 创建节点
curNode.NextNode = &LinkedHandlerNode{
Handler:&handler,
Index:1 + curNode.Index,
NextNode: nil,
}
return
}
这里的思想是给每个消费者预先创建唯一的 Handler,这个 Handler 会依次调用链表中的各个具体的 Handler。当用户订阅 Topic 时,将用户提供的 Handler 添加到链表末尾。
使用示例:
m, err := mq.NewMessageQueue(mq.MessageQueueConfig{
NsqAddr:"127.0.0.1:4150",
NsqLookupdAddr:"127.0.0.1:4161",
SupportedTopics: []string{"hello"},
EnableLookupd:false,
})if err != nil {
zap.L().Fatal("Message queue error: " + err.Error())
}m.Sub("hello", func(resp []byte) bool {
zap.L().Info("S1 Got: " + string(resp))
return false
})
m.Sub("hello", func(resp []byte) bool {
zap.L().Info("S2 Got: " + string(resp))
return true
})
m.Sub("hello", func(resp []byte) bool {
zap.L().Info("S3 Got: " + string(resp))
return false
})
m.Run()
err = m.Pub("hello", "world")
if err != nil {
zap.L().Fatal("Message queue error: " + err.Error())
}
err = m.Pub("hello", "tom")
if err != nil {
zap.L().Fatal("Message queue error: " + err.Error())
}sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
os.Exit(0)
输出:
2021-11-07T20:30:38.448+0800DEBUGmq/mq.go:40New message queue
2021-11-07T20:30:38.448+0800DEBUGmq/mq.go:89initProducer to 127.0.0.1:4150
2021-11-07T20:30:38.448+0800DEBUGmq/mq.go:96initConsumer to hello/default
2021-11-07T20:30:38.448+0800INFOmq/mq.go:113Subscribe hello
2021-11-07T20:30:38.448+0800INFOmq/mq.go:113Subscribe hello
2021-11-07T20:30:38.448+0800INFOmq/mq.go:113Subscribe hello
2021-11-07T20:30:38.448+0800INFOmq/mq.go:82Run consumer for hello
2021/11/07 20:30:38 INF2 [hello/default] (127.0.0.1:4150) connecting to nsqd
2021-11-07T20:30:38.454+0800INFOmq/mq.go:108Pub hello to mq. data = "https://www.it610.com/article/world"
2021/11/07 20:30:38 INF1 (127.0.0.1:4150) connecting to nsqd
2021-11-07T20:30:38.455+0800INFOmq/mq.go:108Pub hello to mq. data = "https://www.it610.com/article/tom"
2021-11-07T20:30:38.455+0800DEBUGmq/mq.go:57handler[0] for hello is invoked
2021-11-07T20:30:38.455+0800INFObuqi-admin-backend/main.go:60S1 Got: "world"
2021-11-07T20:30:38.455+0800DEBUGmq/mq.go:57handler[1] for hello is invoked
2021-11-07T20:30:38.455+0800INFObuqi-admin-backend/main.go:64S2 Got: "world"
2021-11-07T20:30:38.455+0800DEBUGmq/mq.go:60the message has stopped spreading
2021-11-07T20:30:38.455+0800DEBUGmq/mq.go:57handler[0] for hello is invoked
2021-11-07T20:30:38.455+0800INFObuqi-admin-backend/main.go:60S1 Got: "tom"
2021-11-07T20:30:38.455+0800DEBUGmq/mq.go:57handler[1] for hello is invoked
2021-11-07T20:30:38.455+0800INFObuqi-admin-backend/main.go:64S2 Got: "tom"
2021-11-07T20:30:38.455+0800DEBUGmq/mq.go:60the message has stopped spreading
^C
可以看到,Handler 返回 true 时,就可以阻断消息的传播。
推荐阅读
- 危险也是机会
- python学习之|python学习之 实现QQ自动发送消息
- 深入浅出谈一下有关分布式消息技术(Kafka)
- 三门问题(蒙提霍尔悖论)分析与Golang模拟
- 夏夜|夏夜 我们
- Vue组件之事件总线和消息发布订阅详解
- Redis——发布订阅/消息队列
- Java消息中间件概念基础
- 【20190827复盘】——好消息
- MQ(消息队列)功能介绍