聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)

一、前言 上一篇我们讲了 聊聊 Kafka:Kafka 消息丢失的场景以及最佳实践,这一篇我们来说一说 Kafka 消息重复的场景以及最佳实践。
我们下面会从以下两个方面来说一下 Kafka 消息重复的场景以及最佳实践。

  • 生产者重复消息
  • 消费者重复消息
二、Kafka 消息重复的场景 2.1 生产者重复消息
2.1.1 根本原因 生产者发送的消息没有收到 Broker 正确的响应,导致生产者重试。
生产者发出一条消息,Broker 落盘以后因为网络等种种原因,发送端得到一个发送失败的响应或者网
络中断,然后生产者收到一个可恢复的 Exception 重试消息导致消息重复。
2.1.2 重试过程 【聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)】聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
文章图片

主要流程:
  • new KafkaProducer() 后创建一个后台线程 KafkaThread 扫描 RecordAccumulator 中是否有消息;
  • 调用 KafkaProducer.send() 发送消息,实际上只是把消息保存到 RecordAccumulator 中;
  • 后台线程 KafkaThread 扫描到 RecordAccumulator 中有消息后,将消息发送到 Kafka 集群;
  • 如果发送成功,那么返回成功;
  • 如果发送失败,那么判断是否允许重试。如果不允许重试,那么返回失败的结果;如果允许重
试,把消息再保存到 RecordAccumulator 中,等待后台线程 KafkaThread 扫描再次发送。
具体的重试流程见下图:
聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
文章图片

聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
文章图片

2.1.3 可恢复异常说明 从下面源码中不难发现异常是 RetriableException 类型或者消息是事务类型 TransactionManager 允许重试;
聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
文章图片

常见的 RetriableException 类继承关系如下:
聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
文章图片

2.1.4 记录顺序问题 如果设置 max.in.flight.requests.per.connection > 1(默认5,单个连接上发送的未确认请求的最大数量,表示上一个发出的请求没有确认下一个请求又发出了)。大于1可能会改变记录的顺序,因为如果将两个 batch 发送到单个分区,第一个 batch 处理失败并重试,但是第二个 batch 处理成功,那么第二个 batch 处理中的记录可能先出现被消费。
设置 max.in.flight.requests.per.connection = 1,可能会影响吞吐量,可以解决单个生产者发送顺序问题。如果多个生产者,生产者1先发送一个请求,生产者2后发送请求,此时生产者1返回可恢复异常,重试一定次数成功了。虽然生产者1先发送消息,但生产者2发送的消息会被先消费。
2.2 消费者重复消息
2.2.1 根本原因 数据消费完没有及时提交 offset 到 Broker
2.2.2 业务场景 消息消费端在消费过程中挂掉没有及时提交 offset 到 Broker,另一个消费端启动拿之前记录的 offset 开始消费,由于 offset 的滞后性可能会导致新启动的客户端有少量重复消费。
三、Kafka 的三种消息语义 上一篇我们提到 Kafka 的三种消息语义,有一种仅有一次传递语义,可以保证消息不会丢失,也不会被重复发送。
忘了的我这里再说一下:
  • 最多一次(At most once):消息可能会丢失,但绝不会被重复发送。
  • 至少一次(At least once):消息不会丢失,但有可能被重复发送。
  • 仅有一次(Exactly once):消息不会丢失,也不会被重复发送。
那你可能就会问了,那就直接用这个 Exactly once 语义不就行了吗?
Kafka 支持的 Exactly once 和消息传递的服务质量标准 Exactly once 是不一样的。它是 Kafka 提供的另外一个特性,Kafka 中支持的事务也和我们通常意义理解的事务有一定的差异。在 Kafka 中,事务和 Excactly once 主要是为了配合流计算使用的特性。
刚开始我也以为 Kafka 像 MQTT 一样能保证消息传递的仅有一次语义,后来才发现需要自己去实现,不得不说下 Kafka 的营销手段还是不错的。
既然 Kafka 无法保证消息不重复,那就需要我们的消费代码能够接受“消息是可能会重复的”这一现状,然后,通过一些方法来消除重复消息对业务的影响。
Kafka 实际上通过两种机制来确保消息消费的精确一次:
  • 幂等性(Idempotence)
  • 事务(Transaction)
四、幂等性 4.1 什么是幂等性
保证在消息重发的时候,消费者不会重复处理。即使在消费者收到重复消息的时候,重复处理,也要保证最终结果的一致性。
所谓幂等性,数学概念就是: f(f(x)) = f(x) 。f函数表示对消息的处理。
比如,银行转账,如果失败,需要重试。不管重试多少次,都要保证最终结果一定是一致的。
4.2 引入幂等性之前
Producer 向 Broker 发送消息,然后 Broker 将消息追加到消息流中后再给 Producer 返回 Ack 信号值。实现流程如下:
聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
文章图片

生产中,会出现各种不确定的因素,比如在 Producer 在发送给 Broker 的时候出现网络异常。比如以下这种异常情况的出现:
聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
文章图片

上图这种情况,当 Producer 第一次发送消息给 Broker 时,Broker 将消息 (x2,y2) 追加到了消息流中, 但是在返回 Ack 信号给 Producer 时失败了(比如网络异常) 。此时,Producer 端触发重试机制,将消息 (x2,y2) 重新发送给 Broker,Broker 接收到消息后,再次将该消息追加到消息流中,然后成功返回 Ack 信号给 Producer。这样下来,消息流中就被重复追加了两条相同的 (x2,y2) 的消息。
4.3 引入幂等性之后
Kafka为了实现幂等性,在 0.11.0 版本之后,它在底层设计架构中引入了ProducerIDSequenceNumber
  • ProducerID:在每个新的 Producer 初始化时,会被分配一个唯一的 ProducerID,这个 ProducerID 对客户端使用者是不可见的。
  • SequenceNumber:对于每个 ProducerID,Producer 发送数据的每个 Topic 和 Partition 都对应一个从 0 开始单调递增的 SequenceNumber 值。
聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
文章图片

同样,这是一种理想状态下的发送流程。实际情况下,会有很多不确定的因素,比如 Broker 在发送 Ack 信号给 Producer 时出现网络异常,导致发送失败。异常情况如下图所示:
聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
文章图片

当 Producer 发送消息 (x2,y2) 给 Broker 时,Broker 接收到消息并将其追加到消息流中。此时,Broker 返回 Ack 信号给 Producer 时,发生异常导致 Producer 接收 Ack 信号失败。对于 Producer 来说,会触发重试机制,将消息 (x2,y2) 再次发送,但是,由于引入了幂等性,在每条消息中附带PID(ProducerID) 和 SequenceNumber。相同的 PID 和 SequenceNumber 发送给 Broker,而之前 Broker 缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条 (x2,y2),不会出现重复发送的情况。
4.4 源码分析
直接看 org.apache.kafka.clients.producer.internals.Sender#run 方法,然后跟进 runOnce 方法:
聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
文章图片

细心地读者可能会问了,老周啊,你上面不是说的 SequenceNumber 么,怎么 ProducerIdAndEpoch 里是 epoch 啊?没错,老周这里的版本是 2.7.0 了,跟以前的老版本有点不一样,但意思是一样的,之前是序列号,现在的是代。
4.5 注意
我们上面说了在每个新的 Producer 初始化时,会分配一个 PID,消息发送到的每一个分区都有对应的代号,这些代号从 0 开始单调递增。生产者每发送一条消息就会将 对应的代号值加 1。
Broker 端在内存中为每一对 维护一个序列号 epoch_old。针对生产者发送来的每一条消息,对其代号 epoch_new 进行判断,并作相应处理。
  • 只有 epoch_new 比 epoch_old 大 1 时,即 epoch_new = epoch_old + 1 时,Broker 才会接受这条消息;
  • epoch_new < epoch_old + 1,说明消息被重复写入,Broker 直接丢弃该条消息;
  • epoch_new > epoch_old + 1,说明中间有数据尚未写入,出现了消息乱序,可能存在消息丢失的现象,对应的生产者会抛出 OutOfOrderSequenceException 异常。
代号针对 ,这意味着幂等生产者只能保证单个主题的单一分区内消息不重复;其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性,这里的会话即可以理解为:Producer 进程的一次运行。当重启了 Producer 进程之后,则幂等性保证就失效了。
那么你可能会问,如果我想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别!
五、事务 幂等性不能实现多分区以及多会话上的消息无重复,而 Kafka 事务则可以弥补这个缺陷,Kafka 自 0.11 版本开始也提供了对事务的支持,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。下面我们就来看看 Kafka 中的事务型 Producer。
事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
六、最佳实践 6.1 生产端
6.1.1 幂等性 Producer 幂等性 Producer 只适合单个主题的单一分区内消息不重复,其次,它只能实现单会话上的幂等性。
  • 设置 enable.idempotence=true,生产者将确保在流中准确地写入每个消息的一个副本。
    聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
    文章图片
  • 设置 acks = all。代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
    聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
    文章图片
  • 设置 max.in.flight.requests.per.connection < 5,客户端将在单个连接上发送的未确认请求的最大数量。
    聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
    文章图片

    具体设置多少,可以参考上文 2.1.4 记录顺序问题,根据自己的业务情况来设置。
  • 设置 retries = 3,当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
    聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
    文章图片

    如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过max.request.size参数配置的值时,这种方式就不可行了。
  • 设置 retry.backoff.ms = 300,合理估算重试的时间间隔,可以避免无效的频繁重试。
    聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
    文章图片

    它用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置 retriesretry.backoff.ms之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。
6.1.2 事务型 Producer 能实现多分区以及多会话上的消息无重复,即使进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
其它参数和上面的幂等性 Producer 参数一致,多加一个:
  • 设置 transactional.id。这就支持了跨多个生产者会话的可靠性语义,因为它允许客户端在启动任何新事务之前确保使用相同 TransactionalId 的事务已经完成,最好为其设置一个有意义的名字。
    聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
    文章图片
事务型 Producer 代码应该这样写:
producer.initTransactions(); try { producer.beginTransaction(); producer.send(record1); producer.send(record2); producer.commitTransaction(); } catch (KafkaException e) { producer.abortTransaction(); }

和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。
这段代码能够保证 record1 和 record2 被当作一个事务统一提交到 Kafka,要么它们全部提交成功,要么全部写入失败。
6.1.3 不在乎数据丢失的场景 你可能会问了,我的系统存在这种场景,我不在乎数据的丢失,该怎么配呢?你不会让我配置上面那些幂等性、事务型的参数吧?比如我的可能是日志收集的系统,这会导致系统吞吐量严重降低。
别着急,针对这种场景也有有有相应的方案的,那上面的参数别管了,
  • 设置 ack=0,不需要 Broker 接收到消息的确认也不需要重试。
6.2 消费端
6.2.1 消费事务型 Producer 消息 设置 isolation.level=read_committed 参数的值即可
聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
文章图片

  • read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
  • read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。
6.2.2 通用设置 确保消息消费完成再提交。最好把它设置成 enable.auto.commit = false,并采用手动提交位移的方式。这对于单 Consumer 多线程处理的场景而言是至关重要的。
聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
文章图片

6.2.3 下游消费端做幂等 6.2.3.1 利用数据库的唯一约束实现幂等 比如金融领域,有一张流水表,表里有三个字段:转账单 ID、账户 ID 和变更金额,对于每个转账单每个账户只可以执行一次变更操作,我们可以给转账单 ID 和账户 ID 这两个字段联合起来创建一个唯一约束,这样对于相同的转账单 ID 和账户 ID,表里至多只能存在一条记录。
聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
文章图片

基于这个思路,不光是可以使用关系型数据库,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等,比如,你可以用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费。
6.2.3.2 设置前置条件 给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。
聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
文章图片

但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的方法是,给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。
不知道小伙伴们有没有发现,这个思想就是大名鼎鼎 CAS 乐观锁机制,其实很多高大上的东西并没有那么难,思想往往很简单。
6.2.3.3 记录并检查操作 如果上面两种方法都不适用你,这里还有一种通用性更强的方法,就是给每条消息都记录一个全局唯一 ID,消费时,先根据这个全局唯一 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
聊聊 Kafka(Kafka 消息重复的场景以及最佳实践)
文章图片

是不是感觉很简单?但里面的坑很大,说到底还是数据一致性不好保证,在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。
好了,这就是 Kafka 消息重复的场景以及最佳实践的所有内容了,希望对你实践有所参考意义,我们下期再见。

    推荐阅读