深入解析|深入解析 Apache Pulsar 系列(一)(客户端消息确认)

编者荐语:
本文转载自腾讯云中间件公众号。作者林琳,Apache Pulsar PMC,腾讯云中间件专家工程师,《深入解析Apache Pulsar》作者。本系列第一篇文章为大家解读 Pulsar 中消息确认的模式。
导语 在 Apache Pulsar 中,为了避免消息的重复投递,消费者进行消息确认是非常重要的一步。当一条消息被消费者消费后,需要消费者发送一个 Ack 请求给 Broker,Broker 才会认为这条消息被真正消费掉。被标记为已经消费的消息,后续不会再次重复投递给消费者。在这篇文章中,我们会介绍 Pulsar 中消息确认的模式,以及正常消息确认在 Broker 侧是如何实现的。
确认消息的模式 在了解 Pulsar 消息确认模式之前,我们需要先了解一些前置知识 —— Pulsar 中的订阅以及游标(Cursor)。Pulsar 中有多种消费模式,如:Shared(共享)、Key_shared(键共享)、Failover(灾备) 等等,无论用户使用哪种消费模式都会创建一个订阅。订阅分为持久订阅和非持久订阅,对于持久订阅,Broker 上会有一个持久化的 Cursor,即 Cursor 的元数据被记录在 ZooKeeper。Cursor 以订阅(或称为消费组)为单位,保存了当前订阅已经消费到哪个位置了。因为不同消费者使用的订阅模式不同,可以进行的 Ack 行为也不一样。总体来说可以分为以下几种 Ack 场景:
1. 单条消息确认(Acknowledge) 和其他的一些消息系统不同,Pulsar 支持一个 Partition 被多个消费者消费。假设消息 1、2、3 发送给了 Consumer-A,消息 4、5、6 发送给了 Consumer-B,而 Consumer-B 又消费的比较快,先 Ack 了消息 4,此时 Cursor 中会单独记录消息 4 为已 Ack 状态。如果其他消息都被消费,但没有被 Ack,并且两个消费者都下线或 Ack 超时,则 Broker 会只推送消息 1、2、3、5、6,已经被 Ack 的消息 4 不会被再次推送。
2. 累积消息确认(AcknowledgeCumulative) 假设 Consumer 接受到了消息 1、2、3、4、5,为了提升 Ack 的性能,Consumer 可以不分别 Ack 5 条消息,只需要调用 AcknowledgeCumulative,然后把消息 5 传入,Broker 会把消息 5 以及之前的消息全部标记为已 Ack。
3. 累积消息确认(AcknowledgeCumulative) 这种消息确认模式,调用的接口和单条消息的确认一样,但是这个能力需要 Broker 开启配置项 AcknowledgmentAtBatchIndexLevelEnabled。当开启后,Pulsar 可以支持只 Ack 一个 Batch 里面的某些消息。假设 Consumer 拿到了一个批消息,里面有消息 1、2、3,如果不开启这个选项,我们只能消费整个 Batch 再 Ack,否则 Broker 会以批为单位重新全部投递一次。前面介绍的选项开启之后,我们可以通过 Acknowledge 方法来确认批消息中的单条消息。
4. 否定应答(NegativeAcknowledge) 客户端发送一个 RedeliverUnacknowledgedMessages命令给 Broker,明确告知 Broker,当前 Consumer 无法消费这条消息,消息将会被重新投递。
并不是所有的订阅模式下都能用上述这些 Ack 行为,例如:Shared 或者 Key_shared 模式下就不支持累积消息确认(AcknowledgeCumulative)。因为在 Shared 或者 Key_shared 模式下,前面的消息不一定是被当前 Consumer 消费的,如果使用 AcknowledgeCumulative,会把别人的消息也一起确认掉。订阅模式与消息确认之间的关系如下所示:
深入解析|深入解析 Apache Pulsar 系列(一)(客户端消息确认)
文章图片

Acknowledge 与 AcknowledgeCumulative 的实现 Acknowledge 与 AcknowledgeCumulative 接口不会直接发送消息确认请求给 Broker,而是把请求转交给 AcknowledgmentsGroupingTracker 处理。这是我们要介绍的 Consumer 里的第一个 Tracker,它只是一个接口,接口下有两个实现,一个是持久化订阅的实现,另一个是非持久化订阅的实现。由于非持久化订阅的 Tracker 实现都是空,即不做任何操作,因此我们只介绍持久化订阅的实现——PersistentAcknowledgmentsGroupingTracker。
在 Pulsar 中,为了保证消息确认的性能,并避免 Broker 接收到非常高并发的 Ack 请求,Tracker 中默认支持批量确认,即使是单条消息的确认,也会先进入队列,然后再一批发往 Broker。我们在创建 Consumer 时可以设置参数 AcknowledgementGroupTimeMicros,如果设置为 0,则 Consumer 每次都会立即发送确认请求。所有的单条确认(IndividualAck)请求会先放入一个名为 PendingIndividual Acks 的 Set,默认是每 100ms 或者堆积的确认请求超过 1000,则发送一批确认请求。
消息确认的请求最终都是异步发送出去,如果 Consumer 设置了需要回执(Receipt),则会返回一个 CompletableFuture,成功或失败都能通过 Future 感知到。默认都是不需要回执的,此时直接返回一个已经完成的 CompletableFuture。
对于 Batch 消息中的单条确认(IndividualBatchAck),用一个名为 PendingIndividualBatchIndexAcks的 Map 进行保存,而不是普通单条消息的 Set。这个 Map 的 Key 是 Batch 消息的 MessageId,Value 是一个 BitSet,记录这批消息里哪些需要 Ack。使用 BitSet 能大幅降低保存消息 Id 的能存占用,1KB 能记录 8192 个消息是否被确认。由于 BitSet 保存的内容都是 0 和 1,因此可以很方便地保存在堆外,BitSet 对象也做了池化,可以循环使用,不需要每次都创建新的,对内存非常友好。
如下图所示,只用了 8 位,就表示了 Batch 里面 8 条消息的 Ack 情况,下图表示 EntryId 为 0、2、5、6、7 的 Entry 都被确认了,确认的位置会被置为 1:
深入解析|深入解析 Apache Pulsar 系列(一)(客户端消息确认)
文章图片

对于累计确认(CumulativeAck)实现方式就更简单了,Tracker 中只保存最新的确认位置点即可。例如,现在 Tracker 中保存的 CumulativeAck 位置为 5:10,代表该订阅已经消费到 LedgerId=5,EntryId=10 的这条消息上了。后续又 Ack 了一个 5:20,则直接替换前面的 5:10 为 5:20 即可。
最后就是 Tracker 的 Flush,所有的确认最终都需要通过触发 Flush 方法发送到 Broker,无论是哪种确认,Flush 时创建的都是同一个命令并发送给 Broker,不过传参中带的 AckType 会不一样。
NegativeAcknowledge 的实现 否定应答和其他消息确认一样,不会立即请求 Broker,而是把请求转交给 NegativeAcksTracker 进行处理。Tracker 中记录着每条消息以及需要延迟的时间。Tracker 复用了 PulsarClient 的时间轮,默认是 33ms 左右一个时间刻度进行检查,默认延迟时间是 1 分钟,抽取出已经到期的消息并触发重新投递。Tracker 主要存在的意义是为了合并请求。另外如果延迟时间还没到,消息会暂存在内存,如果业务侧有大量的消息需要延迟消费,还是建议使用 ReconsumeLater 接口。NegativeAck 唯一的好处是,不需要每条消息都指定时间,可以全局设置延迟时间。
未确认消息的处理 如果消费者获取到消息后一直不 Ack 会怎么样?这要分两种情况,第一种是业务侧已经调用了 Receive 方法,或者已经回调了正在异步等待的消费者,此时消息的引用会被保存进 UnAckedMessageTracker,这是 Consumer 里的第三个 Tracker。UnAckedMessageTracker 中维护了一个时间轮,时间轮的刻度根据 AckTimeoutTickDurationInMs 这两个参数生成,每个刻度时间=AckTimeout / TickDurationInMs。新追踪的消息会放入最后一个刻度,每次调度都会移除队列头第一个刻度,并新增一个刻度放入队列尾,保证刻度总数不变。每次调度,队列头刻度里的消息将会被清理,UnAckedMessageTracker 会自动把这些消息做重投递。
重投递就是客户端发送一个 RedeliverUnacknowledgedMessages命令给 Broker。每一条推送给消费者但是未 Ack 的消息,在 Broker 侧都会有一个集合来记录(PengdingAck),这是用来避免重复投递的。触发重投递后,Broker 会把对应的消息从这个集合里移除,然后这些消息就可以再次被消费了。注意,当重投递时,如果消费者不是 Share 模式是无法重投递单条消息的,只能把这个消费者所有已经接收但是未 ack 的消息全部重新投递。下图是一个时间轮的简单示例:
深入解析|深入解析 Apache Pulsar 系列(一)(客户端消息确认)
文章图片

另外一种情况就是消费者做了预拉取,但是还没调用过任何 Receive 方法,此时消息会一直堆积在本地队列。预拉取是客户端 SDK 的默认行为,会预先拉取消息到本地,我们可以在创建消费者时通过 ReceiveQueueSize 参数来控制预拉取消息的数量。Broker 侧会把这些已经推送到 Consumer 本地的消息记录为 PendingAck,并且这些消息也不会再投递给别的消费者,且不会 Ack 超时,除非当前 Consumer 被关闭,消息才会被重新投递。Broker 侧有一个 Redelivery Tracker 接口,暂时的实现是内存追踪(InMemoryRedeliveryTracker)。这个 Tracker 会记录消息到底被重新投递了多少次,每条消息推送给消费者时,会先从 Tracker 的哈希表中查询一下重投递的次数,和消息一并推送给消费者。
由上面的逻辑我们可以知道,创建消费者时设置的 Receive QueueSize 真的要慎重,避免大量的消息堆积在某一个 Consumer 的本地预拉取队列,而其他 Consumer 又没有消息可消费。
PulsarClient 上可以设置启用 ConsumerStatsRecorder,启用后,消费者会在固定间隔会打印出当前消费者的 metrics 信息,例如:本地消息堆积量、接受的消息数等,方便业务排查性能问题。
后记 Pulsar 中的设计细节非常多,由于篇幅有限,作者会整理一系列的文章进行技术分享,敬请期待。
关注公众号「Apache Pulsar」, 获取干货与动态
【深入解析|深入解析 Apache Pulsar 系列(一)(客户端消息确认)】加入 Apache Pulsar 中文交流群
深入解析|深入解析 Apache Pulsar 系列(一)(客户端消息确认)
文章图片

    推荐阅读