golang监听rabbitmq消息队列任务断线自动重连接
需求背景:
【golang监听rabbitmq消息队列任务断线自动重连接】goalng常驻内存任务脚本监听rbmq执行任务
任务脚本由supervisor来管理
文章图片
当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态
文章图片
文章图片
假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了
如果是短时间的停止重启,supervisor是可以即时唤醒该程序。如果服务器长时间没有恢复正常运行,程序就会出现fatal进程启动失败的状态,此时可以通过告警来提醒开发人员
文章图片
如果以上告警能时时通知运维人员此问题可以略过了。今天讨论的是如果在长时间断开连接还能在服务器恢复正常情况下自动实现重连。
文章图片
代码实现一:
消费者:
文章图片
文章图片
package mainimport ( "fmt" "github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq" )type RecvPro struct {}//// 实现消费者 消费消息失败 自动进入延时尝试尝试3次之后入库db /* 返回值 error 为nil则表示该消息消费成功 否则消息会进入ttl延时队列重复尝试消费3次 3次后消息如果还是失败 消息就执行失败进入告警 FailAction */ func (t *RecvPro) Consumer(dataByte []byte) error { //time.Sleep(500*time.Microsecond) //return errors.New("顶顶顶顶") fmt.Println(string(dataByte)) //time.Sleep(1*time.Second) //return errors.New("顶顶顶顶") return nil }//消息已经消费3次 失败了 请进行处理 /* 如果消息 消费3次后 仍然失败此处可以根据情况 对消息进行告警提醒 或者 补偿入库db钉钉告警等等 */ func (t *RecvPro) FailAction(err error,dataByte []byte) error { fmt.Println(string(dataByte)) fmt.Println(err) fmt.Println("任务处理失败了,我要进入db日志库了") fmt.Println("任务处理失败了,发送钉钉消息通知主人") return nil }func main() { t := &RecvPro{}//rabbitmq.Recv(rabbitmq.QueueExchange{ //"a_test_0001", //"a_test_0001", //"", //"", //"amqp://guest:guest@192.168.2.232:5672/", //},t,5)/* runNums: 表示任务并发处理数量一般建议 普通任务1-3就可以了 */ err := rabbitmq.Recv(rabbitmq.QueueExchange{ "a_test_0001", "a_test_0001", "hello_go", "direct", "amqp://guest:guest@192.168.1.169:5672/", },t,4)if(err != nil){ fmt.Println(err) }}
View Code rabbitmq代码
文章图片
文章图片
package rabbitmqimport ( "errors" "strconv" "time"//"errors" "fmt" "github.com/streadway/amqp" "log" )// 定义全局变量,指针类型 var mqConn *amqp.Connection var mqChan *amqp.Channel// 定义生产者接口 type Producer interface { MsgContent() string }// 定义生产者接口 type RetryProducer interface { MsgContent() string }// 定义接收者接口 type Receiver interface { Consumer([]byte)error FailAction(error , []byte)error }// 定义RabbitMQ对象 type RabbitMQ struct { connection *amqp.Connection Channel *amqp.Channel dns string QueueNamestring// 队列名称 RoutingKeystring// key名称 ExchangeName string// 交换机名称 ExchangeType string// 交换机类型 producerList []Producer retryProducerList []RetryProducer receiverList []Receiver }// 定义队列交换机对象 type QueueExchange struct { QuNamestring// 队列名称 RtKeystring// key值 ExNamestring// 交换机名称 ExTypestring// 交换机类型 Dnsstring//链接地址 }// 链接rabbitMQ func (r *RabbitMQ)MqConnect() (err error){mqConn, err = amqp.Dial(r.dns) r.connection = mqConn// 赋值给RabbitMQ对象if err != nil { fmt.Printf("rbmq链接失败:%s \n", err) }return }// 关闭mq链接 func (r *RabbitMQ)CloseMqConnect() (err error){err = r.connection.Close() if err != nil{ fmt.Printf("关闭mq链接失败:%s \n", err) } return }// 链接rabbitMQ func (r *RabbitMQ)MqOpenChannel() (err error){ mqConn := r.connection r.Channel, err = mqConn.Channel() //defer mqChan.Close() if err != nil { fmt.Printf("MQ打开管道失败:%s \n", err) } return err }// 链接rabbitMQ func (r *RabbitMQ)CloseMqChannel() (err error){ r.Channel.Close() if err != nil { fmt.Printf("关闭mq链接失败:%s \n", err) } return err }// 创建一个新的操作对象 func NewMq(q QueueExchange) RabbitMQ { return RabbitMQ{ QueueName:q.QuName, RoutingKey:q.RtKey, ExchangeName: q.ExName, ExchangeType: q.ExType, dns:q.Dns, } }func (mq *RabbitMQ) sendMsg (body string) (err error){ err = mq.MqOpenChannel() ch := mq.Channel if err != nil{ log.Printf("Channel err:%s \n", err) }defer mq.Channel.Close() if mq.ExchangeName != "" { if mq.ExchangeType == ""{ mq.ExchangeType = "direct" } err =ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil) if err != nil { log.Printf("ExchangeDeclare err:%s \n", err) } }// 用于检查队列是否存在,已经存在不需要重复声明 _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil) if err != nil { log.Printf("QueueDeclare err :%s \n", err) } // 绑定任务 if mq.RoutingKey != "" && mq.ExchangeName != "" { err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil) if err != nil { log.Printf("QueueBind err :%s \n", err) } }if mq.ExchangeName != "" && mq.RoutingKey != ""{ err = mq.Channel.Publish( mq.ExchangeName,// exchange mq.RoutingKey, // routing key false,// mandatory false,// immediate amqp.Publishing { ContentType: "text/plain", Body:[]byte(body), }) }else{ err = mq.Channel.Publish( "",// exchange mq.QueueName, // routing key false,// mandatory false,// immediate amqp.Publishing { ContentType: "text/plain", Body:[]byte(body), }) } return}/* 发送延时消息 */ func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){ err =mq.MqOpenChannel() ch := mq.Channel if err != nil{ log.Printf("Channel err:%s \n", err) } defer mq.Channel.Close()if mq.ExchangeName != "" { if mq.ExchangeType == ""{ mq.ExchangeType = "direct" } err =ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil) if err != nil { return } }if ttl <= 0{ return errors.New("发送延时消息,ttl参数是必须的") }table := make(map[string]interface{},3) table["x-dead-letter-routing-key"] = mq.RoutingKey table["x-dead-letter-exchange"] = mq.ExchangeName table["x-message-ttl"] = ttl*1000//fmt.Printf("%+v",table) //fmt.Printf("%+v",mq) // 用于检查队列是否存在,已经存在不需要重复声明 ttlstring := strconv.FormatInt(ttl,10) queueName := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring) routingKey := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring) _, err = ch.QueueDeclare(queueName, true, false, false, false, table) if err != nil { return } // 绑定任务 if routingKey != "" && mq.ExchangeName != "" { err = ch.QueueBind(queueName, routingKey, mq.ExchangeName, false, nil) if err != nil { return } }header := make(map[string]interface{},1)header["retry_nums"] = 0var ttl_exchange string var ttl_routkey stringif(mq.ExchangeName != "" ){ ttl_exchange = mq.ExchangeName }else{ ttl_exchange = "" }if mq.RoutingKey != "" && mq.ExchangeName != ""{ ttl_routkey = routingKey }else{ ttl_routkey = queueName }err = mq.Channel.Publish( ttl_exchange,// exchange ttl_routkey, // routing key false,// mandatory false,// immediate amqp.Publishing { ContentType: "text/plain", Body:[]byte(body), Headers:header, }) if err != nil { return} return }func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string){ err :=mq.MqOpenChannel() ch := mq.Channel if err != nil{ log.Printf("Channel err:%s \n", err) } defer mq.Channel.Close()if mq.ExchangeName != "" { if mq.ExchangeType == ""{ mq.ExchangeType = "direct" } err =ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil) if err != nil { log.Printf("ExchangeDeclare err:%s \n", err) } }//原始路由key oldRoutingKey := args[0] //原始交换机名 oldExchangeName := args[1]table := make(map[string]interface{},3) table["x-dead-letter-routing-key"] = oldRoutingKey if oldExchangeName != "" { table["x-dead-letter-exchange"] = oldExchangeName }else{ mq.ExchangeName = "" table["x-dead-letter-exchange"] = "" }table["x-message-ttl"] = int64(20000)//fmt.Printf("%+v",table) //fmt.Printf("%+v",mq) // 用于检查队列是否存在,已经存在不需要重复声明 _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table) if err != nil { log.Printf("QueueDeclare err :%s \n", err) } // 绑定任务 if mq.RoutingKey != "" && mq.ExchangeName != "" { err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil) if err != nil { log.Printf("QueueBind err :%s \n", err) } }header := make(map[string]interface{},1)header["retry_nums"] = retry_nums + int32(1)var ttl_exchange string var ttl_routkey stringif(mq.ExchangeName != "" ){ ttl_exchange = mq.ExchangeName }else{ ttl_exchange = "" }if mq.RoutingKey != "" && mq.ExchangeName != ""{ ttl_routkey = mq.RoutingKey }else{ ttl_routkey = mq.QueueName }//fmt.Printf("ttl_exchange:%s,ttl_routkey:%s \n",ttl_exchange,ttl_routkey) err = mq.Channel.Publish( ttl_exchange,// exchange ttl_routkey, // routing key false,// mandatory false,// immediate amqp.Publishing { ContentType: "text/plain", Body:[]byte(body), Headers:header, }) if err != nil { fmt.Printf("MQ任务发送失败:%s \n", err)}}// 监听接收者接收任务 消费者 func (mq *RabbitMQ) ListenReceiver(receiver Receiver) { err :=mq.MqOpenChannel() ch := mq.Channel if err != nil{ log.Printf("Channel err:%s \n", err) } defer mq.Channel.Close() if mq.ExchangeName != "" { if mq.ExchangeType == ""{ mq.ExchangeType = "direct" } err =ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil) if err != nil { log.Printf("ExchangeDeclare err:%s \n", err) } }// 用于检查队列是否存在,已经存在不需要重复声明 _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil) if err != nil { log.Printf("QueueDeclare err :%s \n", err) } // 绑定任务 if mq.RoutingKey != "" && mq.ExchangeName != "" { err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil) if err != nil { log.Printf("QueueBind err :%s \n", err) } } // 获取消费通道,确保rabbitMQ一个一个发送消息 err =ch.Qos(1, 0, false) msgList, err :=ch.Consume(mq.QueueName, "", false, false, false, false, nil) if err != nil { log.Printf("Consume err :%s \n", err) } for msg := range msgList { retry_nums,ok := msg.Headers["retry_nums"].(int32) if(!ok){ retry_nums = int32(0) } // 处理数据 err := receiver.Consumer(msg.Body) if err!=nil { //消息处理失败 进入延时尝试机制 if retry_nums < 3{ fmt.Println(string(msg.Body)) fmt.Printf("消息处理失败 消息开始进入尝试ttl延时队列 \n") retry_msg(msg.Body,retry_nums,QueueExchange{ mq.QueueName, mq.RoutingKey, mq.ExchangeName, mq.ExchangeType, mq.dns, }) }else{ //消息失败 入库db fmt.Printf("消息处理3次后还是失败了 入库db 钉钉告警 \n") receiver.FailAction(err,msg.Body) } err = msg.Ack(true) if err != nil { fmt.Printf("确认消息未完成异常:%s \n", err) } }else { // 确认消息,必须为false err = msg.Ack(true)if err != nil { fmt.Printf("消息消费ack失败 err :%s \n", err) } }} }//消息处理失败之后 延时尝试 func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){ //原始队列名称 交换机名称 oldQName := queueExchange.QuName oldExchangeName := queueExchange.ExName oldRoutingKey := queueExchange.RtKey if oldRoutingKey == "" || oldExchangeName == ""{ oldRoutingKey = oldQName }if queueExchange.QuName != "" { queueExchange.QuName = queueExchange.QuName + "_retry_3"; }if queueExchange.RtKey != "" { queueExchange.RtKey = queueExchange.RtKey + "_retry_3"; }else{ queueExchange.RtKey = queueExchange.QuName + "_retry_3"; }//fmt.Printf("%+v",queueExchange)mq := NewMq(queueExchange) _ = mq.MqConnect()defer func(){ _ = mq.CloseMqConnect() }() //fmt.Printf("%+v",queueExchange) mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName)}func Send(queueExchange QueueExchange,msg string) (err error){ mq := NewMq(queueExchange) err = mq.MqConnect() if err != nil{ return }defer func(){ mq.CloseMqConnect() }()err = mq.sendMsg(msg)return }//发送延时消息 func SendDelay(queueExchange QueueExchange,msg string,ttl int64)(err error){ mq := NewMq(queueExchange) err = mq.MqConnect() if err != nil{ return } defer func(){ _ = mq.CloseMqConnect() }() err = mq.sendDelayMsg(msg,ttl) return }/* runNums开启并发执行任务数量 */ func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){ mq := NewMq(queueExchange) //链接rabbitMQ err = mq.MqConnect() if(err != nil){ return } //rbmq断开链接后 协程退出释放信号 taskQuit:= make(chan struct{}, 1) //尝试链接rbmq tryToLinkC := make(chan struct{}, 1) //开始执行任务 for i:=1; i<=runNums; i++{ go Recv2(mq,receiver,taskQuit); }//如果rbmq断开连接后 尝试重新建立链接 var tryToLink = func() { for { err = mq.MqConnect() if(err == nil){ tryToLinkC <- struct{}{} break } time.Sleep(time.Second * 10) } } for{ select { case <- taskQuit ://rbmq断开连接后 开始尝试重新建立链接 go tryToLink() <-tryToLinkC //建立链接成功后 重新开启协程执行任务 fmt.Println("重新开启新的协程执行任务") go Recv2(mq,receiver,taskQuit); } time.Sleep(time.Millisecond*100) } }func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){ defer func() { fmt.Println("rbmq链接失败,协程任务退出~~~~~~~~~~~~~~~~~~~~") taskQuit <- struct{}{} return }() // 验证链接是否正常 err := mq.MqOpenChannel() if(err != nil){ return } mq.ListenReceiver(receiver) }type retryPro struct { msgContentstring }
View Code
实现重连方式很多,下面实现方式比较简单
文章图片
- Recv方法创建ampq链接
- 启动协程开始执行任务
- MqOpenChannel 打开一个channel通道处理amqp消息
- 拿到消息 处理任务
4,主进程监听taskQuit管道 开始尝试重新链接amqp直到链接成功
5,重新链接成功后启动新的协程处理任务
主要代码分析:
/* runNums开启并发执行任务数量 */ func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){ mq := NewMq(queueExchange) //链接rabbitMQ err = mq.MqConnect() if(err != nil){ return } //rbmq断开链接后 协程退出释放信号 taskQuit:= make(chan struct{}, 1) //尝试链接rbmq tryToLinkC := make(chan struct{}, 1) //开始执行任务 for i:=1; i<=runNums; i++{ go Recv2(mq,receiver,taskQuit); }//如果rbmq断开连接后 尝试重新建立链接 var tryToLink = func() { for { err = mq.MqConnect() if(err == nil){ tryToLinkC <- struct{}{} break } time.Sleep(time.Second * 10) } } for{ select { case <- taskQuit ://rbmq断开连接后 开始尝试重新建立链接 go tryToLink() <-tryToLinkC //建立链接成功后 重新开启协程执行任务 fmt.Println("重新开启新的协程执行任务") go Recv2(mq,receiver,taskQuit); } time.Sleep(time.Millisecond*100) } }func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){ defer func() { fmt.Println("rbmq链接失败,协程任务退出~~~~~~~~~~~~~~~~~~~~") taskQuit <- struct{}{} return }() // 验证链接是否正常 err := mq.MqOpenChannel() if(err != nil){ return } mq.ListenReceiver(receiver) }
推荐阅读
- golang线程安全
- [Golang]力扣Leetcode—中级算法—其他—两整数之和(位运算)
- 【第三十二期】春招 Golang实习面经 七牛
- Golang|Golang 小数操作之判断几位小数点与四舍五入
- Golang|Golang []int []string 互转与判断字符是否在数组中
- Spring|[Spring手撸专栏学习笔记]——容器事件和事件监听器
- [Golang]力扣Leetcode—中级算法—数学—Pow(x, n)(分治算法)
- 【golang】leetcode专项训练-数组与切片
- [Golang]力扣Leetcode—中级算法—数学—Excel表列序号
- golang中的单元测试