kafka中的exactly once semantics
kafka0.11.0.0版本正式支持精确一次处理语义(exactly once semantics,EOS),EOS主要体现在3个方面:
- 幂等producer:保证发送单个分区的消息只会发送一次,不会出现重复消息
- 事务:保证原子性地写入到多个分区,即写入到多个分区的消息要么全成功,要么失败回滚
- 流处理EOS:流处理本质上可看成是‘读取-处理-写入’的管道。此EOS保证整个过程的操作是原子性。注意,这只适用kafka streams。
- 启用幂等producer:在producer程序中设置属性enable.idempotence=true,注意不要设置transactional.id,不要设置成null也不要设置成空字符串
- 启用带伤支持:在producer程序中设置属性transcational.id为一个指定字符串,同时设置enable.idempotence=true
- 启用流式处理EOS:在kafka streams程序中设置processing.guarantee=exactly_once
producer幂等性 producer幂等性指的是当发送同一条消息时,数据在server端只会被持久化一次,数据不丢不重,但是这里的幂等是有条件的:
- 只能保证producer在单个会话内不丢不重,如果producer出现重启是无法保证幂等性的(在设置了幂等的情况下,是无法获取之前的状态信息的,因此无法做到跨会话级别的不丢不重)
- 幂等性不能跨多个topic-partition,只能保证单个partition内的幂等性,当涉及多个topic-partition时,这中间的状态并没有同步。
幂等示例
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put("acks", "all");
// 当 enable.idempotence 为 true,这里默认为 all
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord(topic, "test");
幂等实现原理 kafka proudcer在实现幂等时有两个重要机制:
- PID(Producer ID),用来标识每个producer client
- sequence numbers,client发送的每条消息都会带相应的sn,server端再根据这个值来判断数据是否重复
每个producer在初始化时都会被分配一个唯一的PID,这个PID对用户是透明的,没有暴露给用户。对于一个给定的PID,sequence number将会从0开始自增,每个topic-partition都会有一个独立的sequence number。producer在发送数据时,将会给每条消息标识一个sn,然后server以此来验重。这里的PID是全局唯一的,如果producer重启后会被分配一个新的PID,这也是幂等性无法做到跨会话的一个原因。
PID申请与管理
producer向broker发送一个请求获取PID(server端会选择一台连接数量最少的broker进行处理),broker收到申请PID的请求后,会尝试在ZK创建一个/latest_producer_id_block节点,每个broker向ZK申请一个PID段后,都会将自己申请的PID段信息写入到这个节点,这样当其他broker再申请PID段时,会首先读取这个节点的信息,然后根据block_end选择一个PID段,最后再将信息写回到ZK的这个节点里,这个节点信息内容如下:
{"version":1,"broker":35,"block_start":"4000","block_end":"4999"}
【kafka之七-幂等性】broker与ZK交互流程:
- 先从ZK的/latest_producer_id_block节点读取最新已经分配的PID段信息
- 如果该节点不存在,直接从0开始分配,选择0-1000的PID段(PidBlockSize默认为1000)
- 如果该节点存在,读取其中数据,根据block_end选择这个PID段
- 在选择了相应的PID段后,将这个PID段信息写回到ZK的这个节点中,如果写入成功,证明PID段申请成功;如果失败证明此时可能其它的broker已经更新了这个节点,就需要从步骤1重新开始执行
client幂等时发送流程 java producer(区别于scala producer)是双线程设计,分为用户主线程与sender线程,前者调用send方法将消息写入到producer的内存缓冲区,即RecordAccumulator中,后者会定期从RecordAccumulator中获取消息并将消息归入不同的batch中发送到对应的broker上。在幂等producer中,用户主线程的逻辑变动不大。send方法依然是将消息写入到RecordAccumulator。而Sender线程却有着很大的改支。
- 用户线程调用send方法将数据添加到RecordAccumulator中,添加时会判断是否需要新建一个ProducerBatch,这时的ProducerBatch还是没有PID和sequence number
- sender线程在执行时,判断当前PID是否需要重置:如果有消息重试多次人失败最后因为超时而被移除,这时的seqeunce number有部分已经分配出去,那这是不允许发送的。
- sender线程阻塞获取PID
- 在ProducerBatch里设置相应的PID与sequence number,进行发送。
- 如果PID不存在,那么判断sequence number是否从0开始,是的话,在缓存中记录PID的meta信息(PID,epoch,sequence number),并执行写入操作,否则返回UnknowProducerIdException(PID在server端已经过期或这个PID写的数据已经过期,但producer还在接着上次的sn发送数据)
- 如果PID存在,先检查PID epoch与server端记录的是否相同
- 如果不同,且sn不是从0开始,那么返回OutOfOrderSequenceException异常
- 如果相同,那么根据缓存中记录的最近一次sn(currentLastSeq)检查是否为连接,不连接的情况下那么返回OutOfOrderSequenceException异常
- 每个producer会被分配一个PID,SN
- producer与broker端都有与SN的映射关系
- producer每发送一条消息后就将对应的分区序列号加一
- broker会比较序列号,如果new SN < old SN+1,说明是过期的,会抛弃这条数据;如果new SN > old SN+1,说明有消息丢失了,抛出异常。
Kafka 事务性之幂等性实现
Kafka幂等性及事务
关于Kafka幂等producer的讨论