关于golang监听rabbitmq消息队列任务断线自动重连接的问题
golang监听消息队列rabbitmq任务脚本,当rabbimq消息队列断开连接后自动重试,重新唤起协程执行任务
需求背景:
goalng常驻内存任务脚本监听rbmq执行任务
【关于golang监听rabbitmq消息队列任务断线自动重连接的问题】任务脚本由supervisor来管理
文章图片
当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态
文章图片
文章图片
假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了
如果是短时间的停止重启,supervisor是可以即时唤醒该程序。如果服务器长时间没有恢复正常运行,程序就会出现fatal进程启动失败的状态,此时可以通过告警来提醒开发人员
文章图片
如果以上告警能时时通知运维人员此问题可以略过了。今天讨论的是如果在长时间断开连接还能在服务器恢复正常情况下自动实现重连。
文章图片
代码实现一:
消费者:
package mainimport ("fmt""github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq")type RecvPro struct {}//// 实现消费者 消费消息失败 自动进入延时尝试尝试3次之后入库db/*返回值 error 为nil则表示该消息消费成功否则消息会进入ttl延时队列重复尝试消费3次3次后消息如果还是失败 消息就执行失败进入告警 FailAction */func (t *RecvPro) Consumer(dataByte []byte) error {//time.Sleep(500*time.Microsecond)//return errors.New("顶顶顶顶")fmt.Println(string(dataByte))//time.Sleep(1*time.Second)return nil//消息已经消费3次 失败了 请进行处理如果消息 消费3次后 仍然失败此处可以根据情况 对消息进行告警提醒 或者 补偿入库db钉钉告警等等func (t *RecvPro) FailAction(err error,dataByte []byte) error {fmt.Println(err)fmt.Println("任务处理失败了,我要进入db日志库了")fmt.Println("任务处理失败了,发送钉钉消息通知主人")func main() {t := &RecvPro{}//rabbitmq.Recv(rabbitmq.QueueExchange{//"a_test_0001",//"",//"amqp://guest:guest@192.168.2.232:5672/",//},t,5)/*runNums: 表示任务并发处理数量一般建议 普通任务1-3就可以了*/err := rabbitmq.Recv(rabbitmq.QueueExchange{"a_test_0001","hello_go","direct","amqp://guest:guest@192.168.1.169:5672/",},t,4)if(err != nil){fmt.Println(err)}
rabbitmq代码
package rabbitmqimport ("errors""strconv""time"//"errors""fmt""github.com/streadway/amqp""log")// 定义全局变量,指针类型var mqConn *amqp.Connectionvar mqChan *amqp.Channel// 定义生产者接口type Producer interface {MsgContent() string}type RetryProducer interface {// 定义接收者接口type Receiver interface {Consumer([]byte)errorFailAction(error , []byte)error// 定义RabbitMQ对象type RabbitMQ struct {connection *amqp.ConnectionChannel *amqp.Channeldns stringQueueNamestring// 队列名称RoutingKeystring// key名称ExchangeName string// 交换机名称ExchangeType string// 交换机类型producerList []ProducerretryProducerList []RetryProducerreceiverList []Receiver// 定义队列交换机对象type QueueExchange struct {QuNamestring// 队列名称RtKeystring// key值ExNamestring// 交换机名称ExTypestring// 交换机类型Dnsstring//链接地址// 链接rabbitMQfunc (r *RabbitMQ)MqConnect() (err error){mqConn, err = amqp.Dial(r.dns)r.connection = mqConn// 赋值给RabbitMQ对象if err != nil {fmt.Printf("rbmq链接失败:%s \n", err)}return// 关闭mq链接func (r *RabbitMQ)CloseMqConnect() (err error){err = r.connection.Close()if err != nil{fmt.Printf("关闭mq链接失败:%s \n", err)func (r *RabbitMQ)MqOpenChannel() (err error){mqConn := r.connectionr.Channel, err = mqConn.Channel()//defer mqChan.Close()fmt.Printf("MQ打开管道失败:%s \n", err)return errfunc (r *RabbitMQ)CloseMqChannel() (err error){r.Channel.Close()// 创建一个新的操作对象func NewMq(q QueueExchange) RabbitMQ {return RabbitMQ{QueueName:q.QuName,RoutingKey:q.RtKey,ExchangeName: q.ExName,ExchangeType: q.ExType,dns:q.Dns,func (mq *RabbitMQ) sendMsg (body string) (err error){err = mq.MqOpenChannel()ch := mq.Channellog.Printf("Channel err:%s \n", err)defer mq.Channel.Close()if mq.ExchangeName != "" {if mq.ExchangeType == ""{mq.ExchangeType = "direct"}err =ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)if err != nil {log.Printf("ExchangeDeclare err:%s \n", err)// 用于检查队列是否存在,已经存在不需要重复声明_, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)log.Printf("QueueDeclare err :%s \n", err)// 绑定任务if mq.RoutingKey != "" && mq.ExchangeName != "" {err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)log.Printf("QueueBind err :%s \n", err)if mq.ExchangeName != "" && mq.RoutingKey != ""{err = mq.Channel.Publish(mq.ExchangeName,// exchangemq.RoutingKey, // routing keyfalse,// mandatoryfalse,// immediateamqp.Publishing {ContentType: "text/plain",Body:[]byte(body),})}else{"",// exchangemq.QueueName, // routing key/*发送延时消息 */func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){err =mq.MqOpenChannel()returnif ttl <= 0{return errors.New("发送延时消息,ttl参数是必须的")table := make(map[string]interface{},3)table["x-dead-letter-routing-key"] = mq.RoutingKeytable["x-dead-letter-exchange"] = mq.ExchangeNametable["x-message-ttl"] = ttl*1000//fmt.Printf("%+v",table)//fmt.Printf("%+v",mq)ttlstring := strconv.FormatInt(ttl,10)queueName := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)routingKey := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)_, err = ch.QueueDeclare(queueName, true, false, false, false, table)returnif routingKey != "" && mq.ExchangeName != "" {err = ch.QueueBind(queueName, routingKey, mq.ExchangeName, false, nil)header := make(map[string]interface{},1)header["retry_nums"] = 0var ttl_exchange stringvar ttl_routkey stringif(mq.ExchangeName != "" ){ttl_exchange = mq.ExchangeNamettl_exchange = ""if mq.RoutingKey != "" && mq.ExchangeName != ""{ttl_routkey = routingKeyttl_routkey = queueNameerr = mq.Channel.Publish(ttl_exchange,// exchangettl_routkey, // routing keyfalse,// mandatoryfalse,// immediateamqp.Publishing {ContentType: "text/plain",Body:[]byte(body),Headers:header,})func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string){err :=mq.MqOpenChannel()//原始路由keyoldRoutingKey := args[0]//原始交换机名oldExchangeName := args[1]table["x-dead-letter-routing-key"] = oldRoutingKeyif oldExchangeName != "" {table["x-dead-letter-exchange"] = oldExchangeNamemq.ExchangeName = ""table["x-dead-letter-exchange"] = ""table["x-message-ttl"] = int64(20000)_, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table)header["retry_nums"] = retry_nums + int32(1)ttl_routkey = mq.RoutingKeyttl_routkey = mq.QueueName//fmt.Printf("ttl_exchange:%s,ttl_routkey:%s \n",ttl_exchange,ttl_routkey)fmt.Printf("MQ任务发送失败:%s \n", err)// 监听接收者接收任务 消费者func (mq *RabbitMQ) ListenReceiver(receiver Receiver) {// 获取消费通道,确保rabbitMQ一个一个发送消息err =ch.Qos(1, 0, false)msgList, err :=ch.Consume(mq.QueueName, "", false, false, false, false, nil)log.Printf("Consume err :%s \n", err)for msg := range msgList {retry_nums,ok := msg.Headers["retry_nums"].(int32)if(!ok){retry_nums = int32(0)// 处理数据err := receiver.Consumer(msg.Body)if err!=nil {//消息处理失败 进入延时尝试机制if retry_nums < 3{fmt.Println(string(msg.Body))fmt.Printf("消息处理失败 消息开始进入尝试ttl延时队列 \n")retry_msg(msg.Body,retry_nums,QueueExchange{mq.QueueName,mq.RoutingKey,mq.ExchangeName,mq.ExchangeType,mq.dns,})}else{//消息失败 入库dbfmt.Printf("消息处理3次后还是失败了 入库db 钉钉告警 \n")receiver.FailAction(err,msg.Body)}err = msg.Ack(true)if err != nil {fmt.Printf("确认消息未完成异常:%s \n", err)}else {// 确认消息,必须为falsefmt.Printf("消息消费ack失败 err :%s \n", err)//消息处理失败之后 延时尝试func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){//原始队列名称 交换机名称oldQName := queueExchange.QuNameoldExchangeName := queueExchange.ExNameoldRoutingKey := queueExchange.RtKeyif oldRoutingKey == "" || oldExchangeName == ""{oldRoutingKey = oldQNameif queueExchange.QuName != "" {queueExchange.QuName = queueExchange.QuName + "_retry_3"; if queueExchange.RtKey != "" {queueExchange.RtKey = queueExchange.RtKey + "_retry_3"; queueExchange.RtKey = queueExchange.QuName + "_retry_3"; //fmt.Printf("%+v",queueExchange)mq := NewMq(queueExchange)_ = mq.MqConnect()defer func(){_ = mq.CloseMqConnect()}()//fmt.Printf("%+v",queueExchange)mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName)func Send(queueExchange QueueExchange,msg string) (err error){err = mq.MqConnect()mq.CloseMqConnect()err = mq.sendMsg(msg)//发送延时消息func SendDelay(queueExchange QueueExchange,msg string,ttl int64)(err error){err = mq.sendDelayMsg(msg,ttl)runNums开启并发执行任务数量func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){//链接rabbitMQif(err != nil){//rbmq断开链接后 协程退出释放信号taskQuit:= make(chan struct{}, 1)//尝试链接rbmqtryToLinkC := make(chan struct{}, 1)//开始执行任务for i:=1; i<=runNums; i++{go Recv2(mq,receiver,taskQuit); //如果rbmq断开连接后 尝试重新建立链接var tryToLink = func() {for {err = mq.MqConnect()if(err == nil){tryToLinkC <- struct{}{}breaktime.Sleep(time.Second * 10)for{select {case <- taskQuit ://rbmq断开连接后 开始尝试重新建立链接go tryToLink()<-tryToLinkC //建立链接成功后 重新开启协程执行任务fmt.Println("重新开启新的协程执行任务")go Recv2(mq,receiver,taskQuit); time.Sleep(time.Millisecond*100)func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){defer func() {fmt.Println("rbmq链接失败,协程任务退出~~~~~~~~~~~~~~~~~~~~")taskQuit <- struct{}{}}()// 验证链接是否正常err := mq.MqOpenChannel()if(err != nil){mq.ListenReceiver(receiver)type retryPro struct {msgContentstring
实现重连方式很多,下面实现方式比较简单
文章图片
1.Recv方法创建ampq链接
2.启动协程开始执行任务
MqOpenChannel 打开一个channel通道处理amqp消息
拿到消息 处理任务
3,协程中捕获异常发送消息到taskQuit <- struct{}{}
4,主进程监听taskQuit管道 开始尝试重新链接amqp 直到链接成功
5,重新链接成功后启动新的协程处理任务
主要代码分析:
/*runNums开启并发执行任务数量 */func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){mq := NewMq(queueExchange)//链接rabbitMQerr = mq.MqConnect()if(err != nil){return}//rbmq断开链接后 协程退出释放信号taskQuit:= make(chan struct{}, 1)//尝试链接rbmqtryToLinkC := make(chan struct{}, 1)//开始执行任务for i:=1; i<=runNums; i++{go Recv2(mq,receiver,taskQuit); //如果rbmq断开连接后 尝试重新建立链接var tryToLink = func() {for {err = mq.MqConnect()if(err == nil){tryToLinkC <- struct{}{}break}time.Sleep(time.Second * 10)}for{select {case <- taskQuit ://rbmq断开连接后 开始尝试重新建立链接go tryToLink()<-tryToLinkC //建立链接成功后 重新开启协程执行任务fmt.Println("重新开启新的协程执行任务")go Recv2(mq,receiver,taskQuit); time.Sleep(time.Millisecond*100)}func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){defer func() {fmt.Println("rbmq链接失败,协程任务退出~~~~~~~~~~~~~~~~~~~~")taskQuit <- struct{}{}return}()// 验证链接是否正常err := mq.MqOpenChannel()if(err != nil){mq.ListenReceiver(receiver)
到此这篇关于golang监听rabbitmq消息队列任务断线自动重连接的文章就介绍到这了,更多相关golang rabbitmq断线自动重连内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
推荐阅读
- Vue中使用watch同时监听多个值的实现方法
- 关于非易失性存储器FRAM的常见问题解答
- golang监听rabbitmq消息队列任务断线自动重连接
- golang线程安全
- 关于Java错误提示之找不到或无法加载主类的问题及正确处理方法
- [Golang]力扣Leetcode—中级算法—其他—两整数之和(位运算)
- 关于 CentOS 迁移龙蜥操作系统,这里有一份详细指南,请查收!
- 【第三十二期】春招 Golang实习面经 七牛
- 关于require('mkdirp')创建文件夹
- Golang|Golang 小数操作之判断几位小数点与四舍五入