go-micro v2 + mqtt 实现一个消息发布订阅系统

前话: 我们使用go-micro v2(v2.9.1)版本 + mqtt 消息队列技术实现一个发布订阅demo
1.下载mqtt到开发机 【go-micro v2 + mqtt 实现一个消息发布订阅系统】1.0 如下地址下载相应平台mqtt,本项目以Windows系统为列

https://activemq.apache.org/components/classic/download/

1.1 启动mqtt服务
解压下载的压缩包,进入bin目录双击运行对应版本的activemq.bat文件
go-micro v2 + mqtt 实现一个消息发布订阅系统
文章图片

运行结果如下
go-micro v2 + mqtt 实现一个消息发布订阅系统
文章图片

我们访问上图中的地址http://127.0.0.1:8161/进入mqtt的后台管理界面,用户名:admin 密码:admin 可得如下图:
go-micro v2 + mqtt 实现一个消息发布订阅系统
文章图片

http://127.0.0.1:8161/admin/queues.jsp
go-micro v2 + mqtt 实现一个消息发布订阅系统
文章图片

到此mqtt本地安装完成!
2. go-micro v2 本地环境搭建 搭建步骤请移步:https://blog.csdn.net/kkijhuybjju/article/details/108191967
3. mqtt 插件下载 broker/mqtt/v2为mqtt的golang语言客户端。其为匹配go-micro v2开发版本
go getgithub.com/micro/go-plugins/broker/mqtt/v2

4. broker/mqtt/v2 插件相关api 4.0 init初始化 ,导包
import ( _ "github.com/micro/go-plugins/broker/mqtt/v2"//init初始化 "github.com/micro/go-plugins/broker/mqtt/v2" )

4.1 订阅主题
//创建服务 service := micro.NewService(...options) //获取代理 pubSub := service.Server().Options().Broker //获取链接 err := pubSub.Connect() //订阅主题 subscriber,err := pubSub.Subscribe(topic,func,...options)

4.2 发布消息
//创建服务 service := micro.NewService(...options) //获取代理 pubSub := service.Server().Options().Broker //获取链接 err := pubSub.Connect() //发布消息 pubSub.Publish(topic,&msg)

4.3 相关问题
go run client.go 发生 github.com/eclipse/paho%2emqtt%2egolang.(*router)错误问题 问题解决:安装 go get github.com/eclipse/paho.mqtt.golang

5. client.go 代码
package mainimport ( "fmt" "github.com/golang/protobuf/proto" "github.com/micro/go-micro/v2" "github.com/micro/go-micro/v2/broker" "mqtt/message" _ "github.com/micro/go-plugins/broker/mqtt/v2" //init初始化 "github.com/micro/go-plugins/broker/mqtt/v2" )func main(){ service := micro.NewService( micro.Name("mqtt-client"), micro.Version("v1.0.0"), micro.Broker(mqtt.NewBroker()), ) service.Init() //获取代理 pubSub := service.Server().Options().Broker //获取链接 err := pubSub.Connect() if err !=nil { panic(err) } //消息订阅 subscriber,err := pubSub.Subscribe("mqtt-test-msg",func(event broker.Event)error{ //获取消息 msg := event.Message() //获取主题 topic := event.Topic() switch msg.Header["data_type"] { case "input": parasType1(topic,msg) case "struct": parasType2(topic,msg) } return nil }) if err!=nil{ fmt.Println(subscriber.Topic(),"主题订阅失败! ",err.Error()) }else { fmt.Println(subscriber.Topic(),"主题订阅成功!") } //运行服务 service.Run() defer func() { fmt.Println("client close conn and Unsubscribe") pubSub.Disconnect()//关闭链接 subscriber.Unsubscribe()//取消订阅 }() } func parasType1(topic string,msg *broker.Message){ fmt.Println("收到input消息:") fmt.Println("主题: ",topic) fmt.Println("Header:",msg.Header) fmt.Println("内容:",string(msg.Body),"\n")//[]byte -> string } func parasType2(topic string,msg *broker.Message){ //反序列化 解码数据 data := message.Student{} if err := proto.Unmarshal(msg.Body,&data); err!=nil{ panic(err) } fmt.Println("收到struct消息:") fmt.Println("主题: ",topic) fmt.Println("Header:",msg.Header) fmt.Println("内容:",data,"\n") }

6. pubsub.go 代码
package mainimport ( "fmt" "github.com/gogo/protobuf/proto" "github.com/micro/go-micro/v2" "github.com/micro/go-micro/v2/broker" "mqtt/message" "github.com/micro/go-plugins/broker/mqtt/v2" _ "github.com/micro/go-plugins/broker/mqtt/v2" //init初始化 ) func main() { //创建一个微服务 service := micro.NewService( micro.Name("mqtt-server"), micro.Version("v1.0.1"), micro.Broker(mqtt.NewBroker()),//实例化mqtt broker代理 ) service.Init() //获取代理实例 pubSub := service.Server().Options().Broker //获取链接 err := pubSub.Connect() if err !=nil { panic(err) } //消息发布 sendType1(pubSub,"input") //sendType2(pubSub,"struct") defer pubSub.Disconnect()//关闭链接 } func sendType1(pubSub broker.Broker,data_type string){ var msg string for{ fmt.Println("input:") fmt.Scan(&msg) if msg == "exit"{ break } data_msg := broker.Message{ Header:map[string]string{ "header-name":"publish", "header-version":"v1.0.0", "data_type":data_type, }, Body:[]byte(msg), } err := pubSub.Publish("mqtt-test-msg",&data_msg) if err!=nil{ fmt.Println("消息发布失败! ",err.Error()) }else { fmt.Println("消息发布成功!\n") } } } func sendType2(pubSub broker.Broker,data_type string){ student := &message.Student{Name:"ztind",Age:18,Class:"三年二班"} data,_ := proto.Marshal(student) data_msg := broker.Message{ Header:map[string]string{ "header-name":"publish", "header-version":"v1.0.0", "data_type":data_type, }, Body:data, } err := pubSub.Publish("mqtt-test-msg",&data_msg) if err!=nil{ fmt.Println("消息发布失败! ",err.Error()) }else { fmt.Println("消息发布成功!\n") } }

测试:
go run client.go go run client.go go run pubsub.go

go-micro v2 + mqtt 实现一个消息发布订阅系统
文章图片

源码下载:https://gitee.com/ztind/golang-micro/tree/master/go-micro/broker/mqtt

    推荐阅读