基于rabbitmq实现的延时队列(golang版)

虽然 rabbitmq 没有延时队列的功能,但是稍微变动一下也是可以实现的
实现延时队列的基本要素

  1. 存在一个倒计时机制:Time To Live(TTL)
  2. 当到达时间点的时候会触发一个发送消息的事件:Dead Letter Exchanges(DLX)
    基于第一点,我利用的是消息存在过期时间这一特性, 消息一旦过期就会变成dead letter,可以让单独的消息过期,也可以设置整个队列消息的过期时间 而rabbitmq会有限取两个值的最小
    基于第二点,是用到了rabbitmq的过期消息处理机制: . x-dead-letter-exchange 将过期的消息发送到指定的 exchange 中 . x-dead-letter-routing-key 将过期的消息发送到自定的 route当中
【基于rabbitmq实现的延时队列(golang版)】在这里例子当中,我使用的是 过期消息+转发指定exchange
在 golang 中的实现 首先是消费者comsumer.go
package mainimport ( "log""github.com/streadway/amqp" )func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }func main() { // 建立链接 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close()ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close()// 声明一个主要使用的 exchange err = ch.ExchangeDeclare( "logs",// name "fanout", // type true,// durable false,// auto-deleted false,// internal false,// no-wait nil,// arguments ) failOnError(err, "Failed to declare an exchange")// 声明一个常规的队列, 其实这个也没必要声明,因为 exchange 会默认绑定一个队列 q, err := ch.QueueDeclare( "test_logs",// name false, // durable false, // delete when unused true,// exclusive false, // no-wait nil,// arguments ) failOnError(err, "Failed to declare a queue")/** * 注意,这里是重点!!!!! * 声明一个延时队列, ?我们的延时消息就是要发送到这里 */ _, errDelay := ch.QueueDeclare( "test_delay",// name false, // durable false, // delete when unused true,// exclusive false, // no-wait amqp.Table{ // 当消息过期时把消息发送到 logs 这个 exchange "x-dead-letter-exchange":"logs", },// arguments ) failOnError(errDelay, "Failed to declare a delay_queue")err = ch.QueueBind( q.Name, // queue name, 这里指的是 test_logs "",// routing key "logs", // exchange false, nil) failOnError(err, "Failed to bind a queue")// 这里监听的是 test_logs msgs, err := ch.Consume( q.Name, // queue name, 这里指的是 test_logs "",// consumer true,// auto-ack false,// exclusive false,// no-local false,// no-wait nil,// args ) failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }()log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }

然后是生产者productor.go
package mainimport ( "log" "os" "strings""github.com/streadway/amqp" )func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close()ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close()body := bodyFrom(os.Args) // 将消息发送到延时队列上 err = ch.Publish( "",// exchange 这里为空则不选择 exchange "test_delay",// routing key false,// mandatory false,// immediate amqp.Publishing{ ContentType: "text/plain", Body:[]byte(body), Expiration: "5000", // 设置五秒的过期时间 }) failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body) }func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s }

go run comsumer.go go run productor.go

具体看代码和注释就行, 这里的关键点就是将要延时的消息发送到过期队列当中, 然后监听的是过期队列转发到的 exchange 下的队列 正常情况就是始终监听一个队列,然后把过期消息发送到延时队列中,当消息到达时间后就把消息发到正在监听的队列
本文转至:https://blog.justwe.site/post/go-rabbitmq-delay-queue/

    推荐阅读