Go|RabbitMQ系列笔记封装篇

导语
【Go|RabbitMQ系列笔记封装篇】在阅读本篇笔记时,如果你还不熟悉RabbitMQ,请查看公众号中关于RabbitMQ系列笔记相关文章,如果你已经熟悉了,还请在本篇文章多多指教。本文使用go mod进行获取相关包,使用Go1.12.6版本进行编写,编译器工具使用Vscode。
封装思路
首先为我们的RabbitMQ的工作模式用变量进行区别,分别代表工作模式、广播模式、路由模式、主题模式。

const ( SimpleQueueType = "SimpleQueue" BroadQueueType= "BroadQueue" DirectQueueType = "DirectQueue" TopicQueueType= "TopicQueue" )

定义一个客户端,该客户端包含连接,工作模式的类型,和所需队列的属性,我们使用这个结构体实现相关方法
type MsgClient struct { Conn *amqp.Connection Type string `json:"type"` //消息类型 Data string `json:"data"` //队列数据 }

定义一个接口,这里需要注意的是消费者里是一个函数,函数中的参数是将来我们获取的消息。
// 定义rabbitMQ的接口方法 type IMMessageClient interface { // 连接RabbitMQ,并获取连接 ConnectToRabbitmq(Connection string) // 发送消息 PublishToQueue(msg []byte) error // 消费消息 ConsumeFromQueue(handlerfunc func(d amqp.Delivery)) error }

定义我们的四种工作模式的结构体,这里需要注意的是,因为对于广播模式和路由模式属性基本相同,用了同一个结构体,Topic主题模式由于路由和绑定的路由可能不同,故单独分离了出来。
type SimpleQueue struct { Rout_keystring `json:"rout_key"`//路由 Queuestring `json:"queue"`//队列的名字 Is_persistent bool`json:"is_persistent"` //队列是否持久化 } type ComplexQueue struct { ExchangeNamestring `json:"exchangeName"` Rout_keystring `json:"rout_key"`//路由 Queuestring `json:"queue"`//队列的名字 Is_persistent bool`json:"is_persistent"` //队列是否持久化 }type TopicQueue struct { ExchangeNamestring `json:"exchangeName"` Rout_keystring `json:"rout_key"`//路由 Queuestring `json:"queue"`//队列的名字 Is_persistent bool`json:"is_persistent"` //队列是否持久化 Bind_keystring `json:"bind_key"`//绑定的路由 }

实现接口的相关方法,这里只是部分思路代码,如果想看封装源码,我已经上传到github上,有需要的可以直接拉取下来,当然也可以提交更好的代码到分支上。
//获取连接 func (m *MsgClient) ConnectToRabbitmq(Connection string) { var err error m.Conn, err = amqp.Dial(fmt.Sprintf("%s/", Connection)) if err != nil { log.Fatal(err) } } // 发消息时判断其类型,注意使用json进行反序列化 if m.Type == SimpleQueueType { var s SimpleQueue json.Unmarshal([]byte(m.Data), &s) q, err := ch.QueueDeclare( s.Queue,//name队列的名称 s.Is_persistent, //durble是否持久化 false,//delete when unused是否自动删除 false,//exclusive是否设置排他,如果设置为true,则队列仅对首次声明他的连接可见,并在连接断开的时候自动删除 false,//no-wait是否阻塞 nil,//arguments ) FailOnError(err, "队列申请失败") err = ch.Publish( "", q.Name, // 路由,即队列的名字 false,//mandatory false,//immediate amqp.Publishing{ DeliveryMode: amqp.Persistent, //消息的持久化 ContentType:"text/plain", Body:msg, }, ) FailOnError(err, "发送消息失败") } //接收消息时需要绑定路由 // 队列绑定 err = ch.QueueBind( q.Name,//队列的名字 s.Rout_key,//routing key s.ExchangeName, //所绑定的交换器 false, nil, )

读取我们的消息
func consumeLoop(deliveries <-chan amqp.Delivery, handlerfunc func(d amqp.Delivery)) { for d := range deliveries { fmt.Println("有数据:", string(d.Body)) handlerfunc(d) } }

现在我们来测试一下吧
测试需要编写一个消费者收到消息后处理消息
func recive(d amqp.Delivery) { fmt.Println(string(d.Body)) d.Acknowledger.Ack(d.DeliveryTag, true) }

测试我们的work模式,这里为了持续测试,我们使用一个协程,并用http监听防止我们的程序退出。
simplequeue := client.NewSimpleQueue("user", "Login", true) body, _ := json.Marshal(simplequeue) fmt.Println(string(body)) Simple := &client.MsgClient{ Type: client.SimpleQueueType, Data: string(body), } body, _ = json.Marshal(Simple) fmt.Println(string(body)) Simple.ConnectToRabbitmq("amqp://admin:admin@192.168.10.252:5672" go Simple.ConsumeFromQueue(recive) http.ListenAndServe("0.0.0.0:8200", nil)

输出进行了公平调度
Go|RabbitMQ系列笔记封装篇
文章图片

Go|RabbitMQ系列笔记封装篇
文章图片

测试我们的广播模式
broadqueue := client.NewComplexQueue("broadqueue_exchange", "broadqueue_route", "", true) body, _ := json.Marshal(broadqueue) Simple := &client.MsgClient{ Type: client.BroadQueueType, Data: string(body), } Simple.ConnectToRabbitmq("amqp://admin:admin@192.168.10.252:5672") go Simple.ConsumeFromQueue(recive)

两个消费者同时收到了消息进行打印
Go|RabbitMQ系列笔记封装篇
文章图片

推荐阅读
  • 开发环境搭建(持续更新中)
  • RabbitMQ系列笔记介绍篇
  • Golang中Modle包的使用
  • goriila context深入学习笔记
  • Go Context深入学习笔记
  • 基于Nginx和Consul构建高可用及自动发现的Docker服务架构
  • 关于log日志的深入学习笔记
本文欢迎转载,转载请联系作者,谢谢!
  • 公众号【常更新】:陌无崖
  • GitHub:https://github.com/yuwe1
  • CSDN【看心情更新】: https://blog.csdn.net/weixin_40051278
  • 博客地址【定期更新】:https://mowuya.cn/
Go|RabbitMQ系列笔记封装篇
文章图片

    推荐阅读