优雅的故障处理(快速创建 Pulsar 重试队列)
关于 Apache Pulsar Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐以及低延时的高可扩展流数据存储特性。
本文来自社区用户投稿,作者侯盛鑫,来自伴鱼。在很多在线的业务系统中,由于业务逻辑处理出现异常,一条消息没有被确认,我们需要尽可能准备好优雅地处理故障。重试是我们的常用做法,一般我们从以下三方面入手进行重试:
- 设置重新投递。若需要允许重新消费失败的消息,我们可以配置消费者同时允许消费消息从业务主题和重试主题,并配置了允许消费者自动重试。
- 设置重试队列。如果消息没有被消费成功,它将被保存到重试主题当中。并可以指定延时时间,自动重新消费重试主题里面的消费失败消息。
- 重试的次数限制。默认情况下,如果消费者没有成功消费一条消息(也就是说消费者无法 ack ),它将重试同一条消息。
如上图,Pulsar 采用非阻塞请求重试队列和死信队列(DLQ) 来扩展现有事件驱动架构作用,通过这样处理我们就可以在不中断实时流量的情况下实现解耦、可观察的错误处理。
但是 Pulsar 默认情况下,自动重试这个选项是关闭的,我们可以设置 enableRetry 选项为 true,这样可以在这个消费者中进行重试。如下例子所示,消费者会从重试主题消费消息:
package main
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"time"
)
func main() {
cp := pulsar.ClientOptions{
URL: "pulsar://xxx.xxx.xxx.xxx:6650",
OperationTimeout: 30 * time.Second,
}
client, err := pulsar.NewClient(cp)
if err != nil {
return
}
defer client.Close()
d := &pulsar.DLQPolicy{
MaxDeliveries: 3,
RetryLetterTopic: "persistent://group/server/xxx-RETRY",
DeadLetterTopic: "persistent://group/server/xxx-DLQ",
}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://group/server/xxx",
SubscriptionName: "test",
Type: pulsar.Failover,
RetryEnable: true,
DLQ: d,
NackRedeliveryDelay: time.Second * 3,
})
if err != nil {
return
}
ctx := context.Background()
for {
msg, err := consumer.Receive(ctx)
if err != nil {
return
}
if msg.Key() == 0 {
// 确认的处理
consumer.Ack(msg)
} else {
// 不确认,等 NackRedeliveryDelay 后将被重新投递到主队列进行消费
consumer.Nack(msg)
// 稍后处理,等 xx 秒后将被重新投递到重试队列
consumer.ReconsumeLater(msg, time.Second * 5)
// 以上方法二选其一
}
}
}
重试队列 首先,如上样例自动创建了一个重试队列,产生重试消息需要两个条件其中一个:
- Nack() 函数,消费者的 Nack() 函数用于确认处理单个消息失败。一旦消息被“否定确认”时,它将被标记为在之后重新传递。投递对象是当前的主 topic ,投递次数不受影响,投递时间受 NackRedeliveryDelay 控制。
- AckTimeout 参数,由于网络抖动,服务 Down 机等原因,未能及时 Nack,Pulsar 为了完善重试机制设置了 Acktimeout 默认为0(不开启的)的参数,consumer 处理一旦超过 Acktimeout 将被投递重试。(在 golang sdk v0.6.0 以及之前并没有实现设置 Acktimeout 的相关功能,之后请持续关注)
Golang 的 sdk 并没有完成 java sdk 中那样丰富多样的重试机制,但是却简单粗暴直接开放了 NackRedeliveryDelay 原始延迟时间的参数,这样方便了各种策略的定制化开发。
其中 DLQPolicy.MaxDeliveries 这个参数在消息出错时,将决定最多继续尝试发送多少次,如到用户设置的最大值,消息还没有成功发送,此时 Pulsar 会将消息推送到死信队列中,也就是 DLQPolicy.DeadLetterTopic 。
注意:??RLQ 是一个延迟队列,消费用 shared 模式!
死信队列 当重试次数用完时,信息将被路由到死信队列中,注意??:此时消息状态会变成已确认。死信队列是一个不分区的持久化队列,用户可以根据自己的需求对信息消息做相应的处理。sdk 提供 DLQPolicy.DeadLetterTopic 参数来设置 “死信队列” 的名字。默认情况下死信队列名称是 :{TopicName}-{Subscription}-DLQ 。
总结 到此为止,我们梳理一下流程:
1、除了正常消费写入的 topic 外重试还会增加一个重试队列,sdk 中会自动订阅重试队列;
2、重试队列实际上是一个延迟队列,未确认消息将维护一个时间相关的优先级队列;
3、当重试用完时,消息将进入死信队列,消息状态变为已确认,用户消费死信队列处理死信消息。
作者简介 我叫侯盛鑫,也可以我叫大云,目前就职于伴鱼基础架构,负责消息队列的维护与相关开发,Rust 日报小组中的菜鸡成员,喜欢研究存储,服务治理等方向。初次接触 Pulsar 就对存储和计算分离的结构所吸引,顺滑的生产者消费者接入和高吞吐让我好奇这个项目的实现,期望之后能在 Pulsar 的相关功能中做些贡献。
推荐阅读 【优雅的故障处理(快速创建 Pulsar 重试队列)】?博文推荐|深入解析Apache Pulsar 中的事务?Pulsar 2.8.0 新增特性概览:独占 Producer、事务等
?博文推荐|有效管理数据安全性—— Pulsar Schema 管理
推荐阅读
- 热闹中的孤独
- JAVA(抽象类与接口的区别&重载与重写&内存泄漏)
- 放屁有这三个特征的,请注意啦!这说明你的身体毒素太多
- 一个人的旅行,三亚
- 布丽吉特,人生绝对的赢家
- 慢慢的美丽
- 尽力
- 一个小故事,我的思考。
- 家乡的那条小河
- 《真与假的困惑》???|《真与假的困惑》??? ——致良知是一种伟大的力量