架构师之路-如何建立高可用消息中间件kafka?( 三 )


架构师之路-如何建立高可用消息中间件kafka?

文章插图
Kafka提供了数据复制算法保证 。如果leader发生故障或挂掉 。一个新leader被选举并被接受客户端的消息成功写入 。Kafka确保从同步副本列表中选举一个副本为leader 。或者说follower追赶leader数据 。leader负责维护和跟踪ISR(In-Sync Replicas的缩写 。表示副本同步队列 。具体可参考下节)中所有follower滞后的状态 。当producer发送一条消息到broker后 。leader写入消息并复制到所有follower 。消息提交之后才被成功复制到所有的同步副本 。消息复制延迟受最慢的follower限制 。重要的是快速检测慢副本 。如果follower“落后”太多或者失效 。leader将会把它从ISR中删除 。
3.3 ISR
上节我们涉及到ISR (In-Sync Replicas) 。这个是指副本同步队列 。副本数对Kafka的吞吐率是有一定的影响 。但极大的增强了可用性 。默认情况下Kafka的replica数量为1 。即每个partition都有一个唯一的leader 。为了确保消息的可靠性 。通常应用中将其值(由broker的参数offsets.topic.replication.factor指定)大小设置为大于1 。比如3 。所有的副本(replicas)统称为Assigned Replicas 。即AR 。
ISR是AR中的一个子集 。由leader维护ISR列表 。follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度) 。任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表 。新加入的follower也会先存放在OSR中 。AR=ISR+OSR 。
Kafka 0.10.x版本后移除了replica.lag.max.messages参数 。只保留了replica.lag.time.max.ms作为ISR中副本管理的参数 。为什么这样做呢?replica.lag.max.messages表示当前某个副本落后leaeder的消息数量超过了这个参数的值 。那么leader就会把follower从ISR中删除 。假设设置replica.lag.max.messages=4 。那么如果producer一次传送至broker的消息数量都小于4条时 。因为在leader接受到producer发送的消息之后而follower副本开始拉取这些消息之前 。follower落后leader的消息数不会超过4条消息 。故此没有follower移出ISR 。所以这时候replica.lag.max.message的设置似乎是合理的 。
但是producer发起瞬时高峰流量 。producer一次发送的消息超过4条时 。也就是超过replica.lag.max.messages 。此时follower都会被认为是与leader副本不同步了 。从而被踢出了ISR 。但实际上这些follower都是存活状态的且没有性能问题 。那么在之后追上leader,并被重新加入了ISR 。于是就会出现它们不断地剔出ISR然后重新回归ISR 。这无疑增加了无谓的性能损耗 。而且这个参数是broker全局的 。设置太大了 。影响真正“落后”follower的移除;设置的太小了 。导致follower的频繁进出 。无法给定一个合适的replica.lag.max.messages的值 。故此 。新版本的Kafka移除了这个参数 。
注:ISR中包括:leader和follower 。
上面一节还涉及到一个概念 。即HW 。HW俗称高水位 。HighWatermark的缩写 。取一个partition对应的ISR中最小的LEO作为HW 。consumer最多只能消费到HW所在的位置 。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态 。对于leader新写入的消息 。consumer不能立刻消费 。leader会等待该消息被所有ISR中的replicas同步后更新HW 。此时消息才能被consumer消费 。这样就保证了如果leader所在的broker失效 。该消息仍然可以从新选举的leader中获取 。对于来自内部broKer的读取请求 。没有HW的限制 。
下图详细的说明了当producer生产消息至broker后 。ISR以及HW和LEO的流转过程:
架构师之路-如何建立高可用消息中间件kafka?

文章插图
由此可见 。Kafka的复制机制既不是完全的同步复制 。也不是单纯的异步复制 。事实上 。同步复制要求所有能工作的follower都复制完 。这条消息才会被commit 。这种复制方式极大的影响了吞吐率 。而异步复制方式下 。follower异步的从leader复制数据 。数据只要被leader写入log就被认为已经commit 。这种情况下如果follower都还没有复制完 。落后于leader时 。突然leader宕机 。则会丢失数据 。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率 。
Kafka的ISR的管理最终都会反馈到Zookeeper节点上 。具体位置为:/brokers/topics/[topic]/partitions/[partition]/state 。目前有两个地方会对这个Zookeeper的节点进行维护:
Controller来维护:Kafka集群中的其中一个Broker会被选举为Controller 。主要负责Partition管理和副本状态管理 。也会执行类似于重分配partition之类的管理任务 。在符合某些特定条件下 。Controller下的LeaderSelector会选举新的leader 。ISR和新的leader_epoch及controller_epoch写入Zookeeper的相关节点中 。同时发起LeaderAndIsrRequest通知所有的replicas 。

推荐阅读