Kafka服务端之KafkaController

[TOC]
在上一节对副本机制的实现进行了分析,其中提到Broker能够处理来自KafkaController的LeaderAndIsrRequest、StopReplicaRequest、UpdateMetadataRequest等请求。本节将介绍KafkaController在集群中扮演的角色以及KafkaController与各个Broker之间如何协同工作。
在Kafka集群的多个Broker中,有一个Broker会被选举为Controller Leader,负责管理整个集群中所有的分区和副本的状态。例如:当某分区的Leader副本出现故障时,由Controller负责为该分区重新选举新的Leader副本;当使用kafka-topics脚本增加某Topic的分区数量时,由Controller管理分区的重新分配;当检测到分区的ISR集合发生变化时,由Controller通知集群中所有的Broker更新其MetadataCache信息。
为了实现Controller的高可用,一个Broker被选为Leader之后,其他的Broker都会成为Follower(不加特殊说明的情况下,本节的“Leader/Follower”指的都是KafkaController的Leader/Follower,请读者不要与副本机制中的Leader副本和Follower副本混淆),会从剩下的Follower中选出新的Controller Leader来管理集群。
选举Controller Leader依赖于ZooKeeper实现,每个Broker启动时都会创建一个KafkaController对象,但是集群中只能存在一个Controller Leader来对外提供服务。在集群启动时,多个Broker上的KafkaController会在指定路径下竞争创建节点,只有第一个成功创建节点的KafkaController才能成为Leader,而其余的KafkaController则成为Follower。当Leader出现故障后,所有的Follower会收到通知,再次竞争在该路径下创建节点从而选出新的Leader。这也是ZooKeeper的一种常见用法。
在Kafka早期版本中并没有采用KafkaController的设计来对分区和副本状态进行管理,而是依赖于ZooKeeper的Watcher和队列。在早期版本的设计中,每个Broker都会在ZooKeeper上注册Watcher,ZooKeeper上就会出现大量Watcher,当分区或副本状态变化时会唤醒很多不必要的Watcher,这种严重依赖ZooKeeper的设计出现了 “脑裂”、“羊群效应”以及ZooKeeper集群过载的情况。在新版本设计中,只有Controller Leader在ZooKeeper上注册Watcher,其他Broker几乎不用再监听ZooKeeper中的数据变化。旧版本中Broker之间传递事件依赖于ZooKeeper的设计比较低效,在新版设计中Controller Leader直接与Broker交互。旧版本的设计毕竟已经废弃,本节不做过多介绍。需要读者了解的是,在设计分布式系统时要适度依赖ZooKeeper集群,合理利用ZooKeeper Watcher,否则就会出现上述问题。
我们先通过图4-55了解ZooKeeper中与KafkaController相关的路径以及该路径中记录的内容的含义。

  • ·/brokers/ids/[id]:记录了集群中可用Broker的id。
  • ·/brokers/topics/[topic]/partitions:记录了一个Topic中所有分区的分配信息以及AR集合信息。
  • ·/brokers/topics/[topic]/partitions/[partition_id]/state:记录了某Partition的Leader副本所在BrokerId、lead_epoch、ISR集合、ZKVersion等信息。
  • ·/controller_epoch:记录了当前Controller Leader的年代信息。
  • ·/controller:记录了当前Controller Leader的Id,也用于Controller Leader的选举。
  • ·/admin/reassign_partitions:记录了需要进行副本重新分配的分区。
  • ·/admin/preferred_replica_election:记录了需要进行“优先副本”选举的分区。“优先副本”是在创建分区时为其指定的第一个副本。
  • ·/admin/delete_topics:记录了待删除的Topic。
  • ·/isr_change_notification:记录了一段时间内ISR集合发生变化的分区。
  • ·/config:记录了一些配置信息。
Kafka服务端之KafkaController
文章图片
image.png 在详细介绍KafkaController的相关组件之前,先从整体上了解KafkaController的设计,以及组件之间的依赖关系如图4-56所示。
Kafka服务端之KafkaController
文章图片
image.png
  • ·KafkaController组织并封装了其他组件,对外提供API接口。
  • ·ZookeeperLeaderElector主要用于Controller Leader的选举。
  • ·ControllerContextKafkaController的上下文信息,缓存了ZooKeeper中记录的整个集群的元信息,例如,可用Broker、全部的Topic、分区、副本的信息。
  • ·ControllerChannelManager维护了Controller Leader与集群中其他Broker之间的网络连接,是管理整个集群的基础。
  • ·TopicDeletionManager用于对指定的Topic进行删除。
  • ·PartitionStateMachine用于管理集群中所有Partition状态的状态机。
  • ·ReplicaStateMachine用于管理集群中所有副本状态的状态机。
  • ·ControllerBrokerRequestBatch实现了向Broker批量发送请求的功能。
  • ·*PartitionLeaderSelector实现了多种Leader副本选举策略。
  • ·*Listener是ZooKeeper上的监听器,实现了对ZooKeeper上某些节点中的数据、子节点或ZooKeeper Session状态的监听,被触发后调用相应的业务逻辑。
从另一个角度来看,KafkaController是ZooKeeper与Kafka集群交互的桥梁:它一方面对ZooKeeper进行监听,其中包括Broker写入到ZooKeeper中的数据,也包括管理员使用脚本写入的数据;另一方面根据ZooKeeper中数据的变化做出相应的处理,通过LeaderAndIsrRequest、StopReplicaRequest、UpdateMetadataRequest等请求控制每个Broker的工作。而且,KafkaController本身也通过ZooKeeper提供了高可用的机制。通过上述组件之间的协调工作,构成了一个统一的整体。
ControllerChannelManager
Controller Leader通过发送多种请求管理集群中的其他Broker,KafkaController使用ControllerChannelManager管理其与集群中各个Broker之间的网络交互。ControllerChannelManager中使用ControllerBrokerStateInfo类表示与一个Broker连接的各种信息。ControllerBrokerStateInfo的定义如下:
Kafka服务端之KafkaController
文章图片
image.png RequestSendThread继承了ShutdownableThread,在线程停止之前会循环执行doWork()方法,通过NetworkClientBlockingOps完成发送请求并阻塞等待响应。
Kafka服务端之KafkaController
文章图片
image.png ControllerChannelManager的核心字段是brokerStateInfo(HashMap[Int,ControllerBrokerStateInfo]类型),用于管理集群中每个Broker对应的ControllerBrokerStateInfo对象。其初始化过程如下:
Kafka服务端之KafkaController
文章图片
image.png ControllerChannelManager.addNewBroker()方法和removeBroker()方法实现了对brokerStateInfo集合的管理,sendRequest()方法向指定Broker发送请求。
Kafka服务端之KafkaController
文章图片
image.png Kafka服务端之KafkaController
文章图片
image.png ControllerContext
ControllerContext中维护了Controller使用到的上下文信息,从其构造函数也能猜到,ControllerContext与ZooKeeper有密切的关系,也可以将ControllerContext看作ZooKeeper数据的缓存。
Kafka服务端之KafkaController
文章图片
image.png ControllerContext中各个字段的含义和作用如下所述。
  • ·controllerChannelManager:管理Controller与集群中Broker之间的连接。
  • ·shuttingDownBrokerIds:正在关闭的BrokerId集合。
  • ·epoch:Controller的年代信息,初始为0。Controller的年代信息存储的ZK路径是“/controller_epoch”。每次重新选举新的Leader Controller,epoch字段值就会增加1。
  • ·epochZkVersion:年代信息的ZK版本,初始为0。
  • ·allTopics:整个集群中全部的Topic名称。
  • ·partitionReplicaAssignment:Map[TopicAndPartition, Seq[Int]]类型,记录了每个分区的AR集合。
  • ·partitionLeadershipInfo:Map[TopicAndPartition, LeaderIsrAndControllerEpoch]类型,记录了每个分区的Leader副本所在的BrokerId、ISR集合以及controller_epoch等信息。其中LeaderIsrAndControllerEpoch的定义如下
Kafka服务端之KafkaController
文章图片
image.png
  • ·partitionBeingReassigned:Map[TopicAndPartition, ReassignedPartitionsContext]类型,记录了正在重新分配副本的分区。该集合的value是ReassignedPartitionsContext类型,其中封装了新分配的AR集合信息以及用于监听ISR集合变化的ReassignedPa rtitionsIsrChangeListener,其定义如下:
Kafka服务端之KafkaController
文章图片
image.png
  • ·partitionsUndergoingPreferredReplicaElection:Set[TopicAndPartition]类型,记录了正在进行“优先副本”选举的分区。
  • ·liveBrokersUnderlying:Set[Broker]类型,记录了当前可用的Broker集合。
  • ·liveBrokerIdsUnderlying:Set[Int]类型,记录了当前可用的BrokerId集合。
ControllerContext为liveBrokersUnderlying字段、liveBrokerIdsUnderlying字段和shuttingDownBrokerIds字段提供了相关的集合操纵方法。
  • ·partitionsOnBroker:获取在指定Broker中存在有副本的分区集合。
  • ·replicasOnBrokers :获取指定Broker集合中保存的所有副本。
  • ·replicasForTopic:获取指定Topic的所有副本。
  • ·partitionsForTopic:获取指定Topic的所有分区
  • ·allLiveReplicas:获取所有可用Broker中保存的副本
  • ·replicasForPartition:获取指定分区集合的副本。
  • ·removeTopic:删除指定Topic。
ControllerBrokerRequestBatch
为了提高Controller Leader与集群中其他Broker的通信效率,KafkaController使用ControllerBrokerRequestBatch组件实现批量发送请求的功能。
ControllerBrokerRequestBatch的核心字段如下所述。
  • ·leaderAndIsrRequestMap:Map [Int,Map[TopicPartition, PartitionStateInfo]]类型,记录了发往指定Broker的LeaderAndIsrRequest所需的信息,其中PartitionStateInfo的定义如下:
Kafka服务端之KafkaController
文章图片
image.png
  • ·stopReplicaRequestMap:Map[Int, Seq[StopReplicaRequestInfo]]类型,记录了发往指定Broker的StopReplicaRequest所需的信息,其中StopReplicaRequestInfo的定义如下:
Kafka服务端之KafkaController
文章图片
image.png
  • ·updateMetadataRequestMap:Map [Int,Map[TopicPartition, PartitionStateInfo]]类型,记录了发往指定Broker的UpdateMetadataRequest集合。
ControllerBrokerRequestBatch的常规用法如下:

Kafka服务端之KafkaController
文章图片
image.png ControllerBrokerRequestBatch.newBatch()方法会检测三个请求集合是否为空,如果不为空则抛出异常。ControllerBrokerRequestBatch.clear()方法则会清空三个请求集合。
ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers()方法会向leaderAndIsrRequestMap集合中添加待发送的LeaderAndIsrRequest所需的数据,同时会调用addUpdateMetadataRequestForBrokers()方法准备向集群中所有可用的Broker发送UpdateMetadataRequest。
Kafka服务端之KafkaController
文章图片
image.png ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers()方法的代码如下:

Kafka服务端之KafkaController
文章图片
image.png Kafka服务端之KafkaController
文章图片
image.png addStopReplicaRequestForBrokers()方法会向stopReplicaRequestMap集合中添加StopReplicaRequest所需的数据,具体实现与上述两个add*RequestForBroker()类似,不再赘述。
ControllerBrokerRequestBatch.sendRequestsToBrokers()方法会使用上述三个集合中的数据来创建相应的请求,并添加到ControllerChannelManager中对应的messageQueue队列中,最终由RequestSendThread线程将请求发送出去。
Kafka服务端之KafkaController
文章图片
image.png PartitionStateMachine
PartitionStateMachine是Controller Leader用于维护分区状态的状态机。分区的状态是通过PartitionState接口定义的,它有四个子类分别代表了分区四种可能的状态,如表4-2所示。
Kafka服务端之KafkaController
文章图片
image.png 分区各个PartitionState之间的转换如图4-57所示。

Kafka服务端之KafkaController
文章图片
image.png 下面分析各个状态之间转换时,需要完成的相关操作。
  • ·NonExistentPartition→NewPartition
    从ZooKeeper中加载分区的AR集合到ControllerContext的partitionReplicaAssignment集合中。
  • ·NewPartition→OnlinePartition
    首先将Leader副本和ISR集合的信息写入到ZooKeeper中,这里会将分区的AR集合中第一个可用的副本选举为Leader副本,并将分区的所有可用副本作为ISR集合。之后,向所有可用的副本发送LeaderAndIsrRequest,指导这些副本进行Leader/Follower的角色切换,并向所有可用的Broker发送UpdateMetadataRequest来更新其上的MetadataCache
  • ·OnlinePartitio/OfflinePartition→OnlinePartition
为分区选择新的Leader副本和ISR集合,并将结果写入ZooKeeper。之后,向需要进行角色切换的副本发送LeaderAndIsrRequest,指导这些副本进行Leader/Follower的角色切换,并向所有可用的Broker发送UpdateMetadataRequest来更新其上的MetadataCache。
  • ·NewPartition,OnlinePartition→OfflinePartition
    只进行状态转换,并没有其他的操作。
  • ·OfflinePartition→NonExistentPartition
    只进行状态转换,并没有其他的操作。
PartitionStateMachine中的各个字段含义和作用如下所述。
  • ·controllerContext:ControllerContext对象,用于维护KafkaController的上下文信息。
  • ·zkUtils:ZooKeeper的客户端,用于与ZooKeeper服务器交互。
  • ·partitionState:Map[TopicAndPartition, PartitionState]类型,记录了每个分区对应的PartitionState状态。
  • ·brokerRequestBatch:ControllerBrokerRequestBatch对象,用于向指定的Broker批量发送请求。
  • noOpPartitionLeaderSelector:默认的Leader副本选举类器,继承了PartitionLeaderSelector。NoOpLeaderSelector实现并没有真正进行Leader副本的选举,其实现是返回当前的Leader副本、ISR集合和AR集合。关于PartitionLeaderSelector的其他实现,下文详述。
  • ·topicChangeListener:ZooKeeper的监听器,用于监听Topic的变化。
  • ·deleteTopicsListener:ZooKeeper的监听器,用于监听Topic的删除。
  • ·partitionModificationsListeners:用于监听分区的修改。
关于ZooKeeper监听器的相关介绍,下文详述。
PartitionStateMachine启动时会对partitionState集合进行初始化,并调用triggerO nlinePartitionStateChange()方法将NewPartition和OfflinePartition状态的分区转换成OnlinePartition状态
Kafka服务端之KafkaController
文章图片
image.png 每个分区初始状态的依据是controllerContext.partitionLeadershipInfo中记录的Leader副本信息和ISR集合信息。
Kafka服务端之KafkaController
文章图片
image.png PartitionStateMachine.handleStateChange()方法是管理分区状态的核心方法,该方法控制着PartitionState的转换。这里需要注意该方法的第三个参数,它指定了用来选举Leader副本的PartitionLeaderSelector对象。
Kafka服务端之KafkaController
文章图片
image.png Kafka服务端之KafkaController
文章图片
image.png PartitionState由NewPartition切换为OnlinePartition时,调用了 initializeLeaderAndIsrF orPartition()方法,其操作的主要步骤是:
(1)从ControllerContext.partitionReplicaAssignment集合中选择第一个可用的副本作为Leader副本,其余的副本构成ISR集合。
(2)将Leader副本和ISR集合的信息写入到ZooKeeper。
(3)更新ControllerContext.partitionLeadershipInfo中缓存的Leader副本、ISR集合等信息。
(4)将上述步骤中确定的Leader副本、ISR集合、AR集合等信息添加到ControllerBrokerRequestBatch,之后会封装成LeaderAndIsrRequest发送给相关的Broker。
PartitionStateMachine.initializeLeaderAndIsrForPartition()方法的具体实现如下:
Kafka服务端之KafkaController
文章图片
image.png 当PartitionState由OfflinePartition或OnlinePartition切换为OnlinePartition时调用了electLeaderForPartition()方法,其操作的主要步骤是:
(1)使用指定的PartitionLeaderSelector为分区选举新的Leader副本。
(2)将Leader副本和ISR集合的信息写入到Zookeeper。
(3)更新ControllerContext.partitionLeadershipInfo集合中缓存的Leader副本、ISR集合等信息。
(4)将上述步骤中确定的Leader副本、ISR集合、AR集合等信息添加到ControllerBrokerRequestBatch,之后会封装成LeaderAndIsrRequest发送给相关的Broker。


PartitionStateMachine.electLeaderForPartition()方法的具体实现如下:! Kafka服务端之KafkaController
文章图片
image.png Kafka服务端之KafkaController
文章图片
image.png 在handleStateChange()方法中对于目标分区状态为NewPartition、OfflinePartition、NonExistentPartition的处理比较简单,只是进行了状态切换,并未进行其他处理。有的读者会说,当PartitionState由NonExistentPartition转换为NewPartition时,并没有从ZooKeeper中加载Partition的AR集合的相关操作。这是因为在调用handleStateChange()的方法中已经完成了此操作,我们以创建Topic的过程为例。当创建Topic时触发TopicChangeListener这个监听器,它会调用handleStateChange()完成PartitionState由NonExistentPartition到NewPartition的切换,调用关系如图4-58所示。

Kafka服务端之KafkaController
文章图片
image.png 在TopicChangeListener中会完成从ZooKeeper加载Partition的AR集合的操作。

Kafka服务端之KafkaController
文章图片
image.png handleStateChange()方法是个private方法,由PartitionStateMachine.handleStateChanges()方法和triggerOnlinePartitionStateChange()方法调用,对外提供PartitionState切换。handleStateChanges()方法对指定的分区集合循环调用handleStateChange()方法进行状态转换。
Kafka服务端之KafkaController
文章图片
image.png triggerOnlinePartitionStateChange()方法对partitionState集合中的全部分区进行遍历,将OfflinePartition和NewPartition状态的分区转换成OnlinePartition状态。状态切换成功的分区即可对外提供服务。
Kafka服务端之KafkaController
文章图片
image.png PartitionLeaderSelector
通过对前面的分析可知,PartitionMachine将Leader副本选举、确定ISR集合的工作委托给了PartitionLeaderSelector接口实现,PartitionMachine可以专注于管理分区状态。这是策略模式的一种典型的应用场景。
图4-59展示了PartitionLeaderSelector的实现类,这五个不同的实现提供了不同的策略。PartitionLeaderSelector接口的定义如下:

Kafka服务端之KafkaController
文章图片
image.png NoOpLeaderSelector是其中最简单的实现,它并没有进行Leader选举,而是将currentLeaderAndIsr直接返回,需要接收LeaderAndIsrRequest的Broker则是分区的AR集合
OfflinePartitionLeaderSelector会根据currentLeaderAndIsr选举新的Leader和ISR集合,策略如下:
1)如果在ISR集合中存在至少一个可用的副本,则从ISR集合中选择新的Leader副本,当前ISR集合为新ISR集合。
(2)如果ISR集合中没有可用的副本且“Unclean leader election”配置被禁用,那么就抛出异常。
(3)如果“Unclean leader election”被开启,则从AR集合中选择新的Leader副本和ISR集合。
(4)如果AR集合中没有可用的副本,抛出异常。
OfflinePartitionLeaderSelector.selectLeader()方法的具体实现如下:
Kafka服务端之KafkaController
文章图片
image.png Kafka服务端之KafkaController
文章图片
image.png 对于剩余的PartitionLeaderSelector实现,这里只介绍其策略,具体的实现代码留给读者自己分析。PreferredReplicaPartitionLeaderSelector的策略是:如果“优先副本”可用且在ISR集合中,则选取其为Leader副本,当前的ISR集合为新的ISR集合,并向AR集合中所有可用副本发送LeaderAndIsrRequest,否则会抛出异常。
ReassignedPartitionLeaderSelector涉及到副本的重新分配,副本重新分配的相关概念后面详细分析,这里先简单了解ReassignedPartitionLeaderSelector的策略:选取的新Leader副本必须在新指定的AR集合中且同时在当前ISR集合中,当前ISR集合为新ISR集合,接收LeaderAndIsrRequest的副本是新指定的AR集合中的副本。
ControlledShutdownLeaderSelector的策略是:从当前ISR集合中排除正在关闭的副本后作为新的ISR集合,从新ISR集合中选择新的Leader,需要向AR集合中可用的副本发送LeaderAndIsrRequest。
ReplicaStateMachine
ReplicaStateMachine是Controller Leader用于维护副本状态的状态机。副本状态由ReplicaState接口表示,它有七个子类,分别代表了副本的七种不同的状态,如表4-3所示。
Kafka服务端之KafkaController
文章图片
image.png ReplicaState之间的转换如图4-60所示。下面介绍各个ReplicaState状态之间转换时需要完成的相关操作。
  • ·NonExistentReplica→NewReplica
Controller向此副本所在Broker发送LeaderAndIsrRequest,并向集群中所有可用的Broker发送 UpdateMetadataRequest。
  • ·NewReplica→OnlineReplica
    Controller将NewReplica加入到AR集合中
  • ·OnlineReplica,OfflineReplica→OnlineReplica
Controller向此副本所在的Broker发送LeaderAndIsrRequest,并向集群中所有可用的Broker发送UpdateMetadataRequest。
  • ·NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible →OfflineReplica Controller向副本所在Broker发送StopReplicaRequest,之后会从ISR集合中清除此副本,最后向其他可用副本所在的Broker发送LeaderAndIsrRequest,并向集群中所有可用的Broker发送UpdateMetadataRequest。
  • ·OfflineReplica→ReplicaDeletionStarted
    Controller向副本所在Broker发送StopReplicaRequest。
  • ·ReplicaDeletionStarted→ReplicaDeletionSuccessful
    只做状态转换,并没有其他操作。
  • ·ReplicaDeletionStarted→ReplicaDeletionIneligible
    只做状态转换,并没有其他操作。
  • ·ReplicaDeletionSuccessful→NonExistentReplica
    Controller从AR集合中删除此副本。
在ReplicaStateMachine中也有controllerContext、zkUtils、brokerRequestBatch字段,它们的功能与PartitionStateMachine中的同名字段相同,ReplicaStateMachine剩余的字段如下所述。
  • ·replicaState:Map[PartitionAndReplica, ReplicaState]类型,记录每个副本对应的ReplicaState状态。
  • ·brokerChangeListener:ZooKeeper的监听器,用于监听Broker的变化,例如Broker宕机或重新上线等事件。关于ZooKeeper监听器的相关介绍,下文详述。
ReplicaStateMachine启动时会对replicaState集合进行初始化,并调用handleStateChanges()方法尝试将可用副本转换为OnlineReplica状态。

Kafka服务端之KafkaController
文章图片
image.png 设置每个副本的初始状态的依据是controllerContext.partitionLeadershipInfo中记录的Broker状态。ReplicaStateMachine.initializeReplicaState()方法如下:
Kafka服务端之KafkaController
文章图片
image.png ReplicaStateMachine的核心方法是handleStateChange()方法,其中控制着ReplicaState的转换。

Kafka服务端之KafkaController
文章图片
image.png Kafka服务端之KafkaController
文章图片
image.png Kafka服务端之KafkaController
文章图片
image.png Kafka服务端之KafkaController
文章图片
image.png ReplicaStateMachine.handleStateChanges()方法对指定的副本集合循环调用handleStateChange()方法来完成状态转换,与PartitionStateMachine中的实现类似,代码不贴出来了。
ZooKeeper Listener I0Itec-zkClient是一款ZooKeeper客户端工具,它并没有像Apache Curator那样实现高级的功能,而是提供了简单易用的API来实现一些常见的功能。I0Itec-zkClient像大多数ZooKeeper客户端框架一样实现了断线重连。它还提供了方便使用的监听器,避免了手动反复注册Watcher的烦琐操作,下文会介绍KafkaController中的多个I0Itec-zkClient Listener实现。I0Itec还对ZooKeeper的异常和序列化做了简单封装。此处不对I0IteczkClient的使用展开详述,感兴趣的读者可以参考相关文档进行学习。
Listener接口介绍
KafkaController会通过ZooKeeper监控整个Kafka集群的运行状态,响应管理员指定的相关操作。具体的实现方式是在ZooKeeper的指定节点上添加Listener,监听此节点中的数据变化或是其子节点的变化,从而触发相应的业务逻辑。
Listener按照接口的类型可以分为三类,如表4-4所示

Kafka服务端之KafkaController
文章图片
image.png IZkDataListener接口的定义如下:
[图片上传失败...(image-eab1f6-1613633270244)]
IZkChildListener接口中的定义如下:

Kafka服务端之KafkaController
文章图片
image.png IZkStateListener接口的定义如下:

Kafka服务端之KafkaController
文章图片
image.png Kafka中提供了五个IZkDataListener接口的实现,它们分别是:LeaderChangeListener、PartitionModificationsListener、PreferredReplicaElectionListener、PartitionsReassignedListener、ReassignedPartitionsIsrChangeListener。如图4-61所示。
Kafka服务端之KafkaController
文章图片
image.png Kafka中提供了四个IZkChildListener接口的实现,它们分别是:DeleteTopicsListener、TopicChangeListener、IsrChangeNotificationListener、BrokerChangeListener,如图4-62所示。
Kafka服务端之KafkaController
文章图片
image.png Kafka中只有了一个IZkStateListener接口的实现,如图4-63所示。
Kafka服务端之KafkaController
文章图片
image.png TopicChangeListener
TopicChangeListener负责管理Topic的增删,它监听“/brokers/topics”节点的子节点变化。其具体逻辑如下:
Kafka服务端之KafkaController
文章图片
image.png 在TopicChangeListener.onNewTopicCreation()方法中还会为每个新增的Topic注册一个PartitionModificationsListener,然后调用onNewPartitionCreation()方法完成新增Topic的分区状态以及副本状态转换。
Kafka服务端之KafkaController
文章图片
image.png 为了便于读者理解,这里举例解释TopicChangeListener的功能。现在假设有三个Broker,管理人员通过kafka-topics脚本添加了一个名为“test”的Topic,它有三个分区,每个分区有三个副本。
kafka-topics脚本向ZooKeeper的“/brokers/topics/test”节点写入的信息是"partitions": {"0": [0,1, 2],"1": [1,2,0],"2": [2,1,0]},此时触发TopicChangeListener。TopicChangeListener将test中每个Partition的AR集合加载到ControllerContext中,在进行第一次分区状态转换(NoExistentPartition→NewPartition)和第一次副本状态切换(NoExistentReplica→NewReplica)时,只做了状态切换并没有发送任何请求。进行第二次分区状态转换(NewPartition→OnlinePartition)时会选取Leader副本和ISR集合信息,结果为如表4-5所示。
Kafka服务端之KafkaController
文章图片
image.png 之后会将此结果写入ZooKeeper,向所有可用Broker发送LeaderAndIsrRequest来指导副本的角色切换,然后向所有可用Broker发送UpdateMetadataRequest来更新其MetadataCache。第二次副本状态切换(NewReplica→OnlineReplica)时,副本已在AR集合中,所以并未做任何操作。
TopicDeletionManager与DeleteTopicsListener
在开始介绍删除Topic 的实现之前,先来了解一下TopicDeletionManager的功能和实现。在TopicDeletionManager中维护了多个集合,用于管理待删除的Topic和不可删除的集合,它会启动一个DeleteTopicsThread线程来执行删除Topic的具体逻辑。
当Topic满足下列三种情况之一时不能被删除:
(1)如果Topic中的任一分区正在重新分配副本,则此Topic不能被删除。
(2)如果Topic中的任一分区正在进行“优先副本”选举,则此Topic不能被删除。
(3)如果Topic中的任一分区的任一副本所在的Broker宕机,则此Topic不能被删除。
TopicDeletionManager中各个字段的含义和功能如下所示。
  • ·partitionStateMachine:用于管理分区状态的状态机。
  • ·replicaStateMachine:用于管理副本状态的状态机。
  • ·topicsToBeDeleted:Set[String]类型,用于记录将要被删除的Topic集合,由TopicDeletionManager的构造器参数initialTopicsToBeDeleted指定其初始化值。
  • ·partitionsToBeDeleted:Set[TopicAndPartition]类型,用于记录将要被删除的分区集合
  • ·topicsIneligibleForDeletion:Set[String]类型,用于记录不可删除的Topic集合,由TopicDeletionManager的构造器参数initialTopicsIneligibleForDeletion指定其初始化值。
  • ·deleteTopicStateChanged:AtomicBoolean类型,用于标识Topic删除操作是否开始。
  • ·deleteTopicsThread:DeleteTopicsThread类型,用于删除Topic的后台线程。
  • ·isDeleteTopicEnabled:配置项delete.topic.enable的值,用于指定是否支持删除Topic。
  • ·deleteTopicsCond:Condition对象,用于其他线程与deleteTopicsThread线程同步。
在TopicDeletionManager启动时,会调用start()方法进行初始化。它会根据isDeleteTopicEnabled字段决定是否启动DeleteTopicsThread线程,如果此时topicsToBeDeleted集合不为空,则DeleteTopicsThread可以开始进行Topic删除的相关操作,并将deleteTopicStateChanged字段设置为true。
DeleteTopicsListener被触发后通过enqueueTopicsForDeletion()将待删除的Topic放入topicsToBeDeleted集合,将待删除的Topic的分区集合放入partitionsToBeDeleted集合,并唤醒DeleteTopicsThread处理。
Kafka服务端之KafkaController
文章图片
image.png DeleteTopicsThread是真正执行Topic删除操作的线程,它继承了ShutdownableThread,入口方法是doWork()方法。删除Topic的步骤如下:
(1)获取待删除Topic的分区集合,构成UpdateMetadataRequest发送给所有的Broker,将Broker中MetadataCache的相关信息删除。这些分区不再对外提供服务。
(2)调用onPartitionDeletion()方法开始对指定分区进行删除。
a)将不可用副本转换成ReplicaDeletionIneligible状态。
b)将可用副本转换成OfflineReplica状态。此步骤会发送StopReplicaRequest到待删除的副本(不会删除副本),同时还会向可用的Broker发送LeaderAndIsrRequest和UpdateMetadataRequest,将副本从ISR集合中删除。
c)将可用副本由OfflineReplica转换成ReplicaDeletionStarted。此步骤会向可用副本发送StopReplicaRequest(删除副本)。注意,这里设置了回调函数处理StopReplicaResponse。
(3)调用deleteTopicStopReplicaCallback()回调函数处理StopReplicaResponse。
a)如果StopReplicaResponse中的错误码表示出现异常,则将副本状态转换为ReplicaDeletionIneligible,并标记此副本所在Topic不可删除,也就是将Topic添加到topicsIneligibleForDeletion队列,最后唤醒DeleteTopicsThread线程。
b)如果StopReplicaResponse正常,则将副本状态转换为ReplicaDeletionSuccessful,并唤醒DeleteTopicsThread线程。
(4)经过上述三个步骤后,开始第二次doWork()调用。如果待删除的Topic的所有副本已经处于ReplicaDeletionSuccessful状态,调用completeDeleteTopic()方法完成Topic的删除。
a)取消partitionModificationsListeners监听。
b)将此Topic的所有副本从ReplicaDeletionSuccessful转换为NonExistentReplica。此步骤会将副本对应的Replica对象从ControllerContext中删除。
c)将Topic的所有分区转换为OfflinePartition状态,紧接着会再转换为NonExistentPartition。
d)将Topic和相关的分区从topicsToBeDeleted集合和partitionsToBeDeleted集合中删除。
e)删除ZooKeeper以及ControllerContext中与此Topic相关的全部信息。
(5)如果还有副本处于ReplicaDeletionStarted状态,则表示还没有收到StopReplicaResponse,则继续等待。
(6)如果Topic的任一副本处于ReplicaDeletionIneligible状态,则表示此Topic不能被删除,调用markTopicForDeletionRetry()将处于ReplicaDeletionIneligible状态的副本重新转换成OfflineReplica状态。此步骤的相关操作在步骤(2)→b中已经详细描述,这里不再赘述。
DeleteTopicsThread.doWork()方法的具体实现如下:

Kafka服务端之KafkaController
文章图片
image.png isTopicEligibleForDeletion()方法根据下面三个条件判断Topic能否开始删除操作。
Kafka服务端之KafkaController
文章图片
image.png onTopicDeletion()方法的核心是向所有可用的Broker发送UpdateMetadataRequest,注意其leader字段为LeaderDuringDelete,通知它们指定的Topic要被删除,并删除MetadataCache中与此Topic相关的缓存信息。
Kafka服务端之KafkaController
文章图片
image.png onPartitionDeletion()方法直接调用了startReplicaDeletion()方法,在startReplicaDeletion()方法中开始对副本进行删除。
Kafka服务端之KafkaController
文章图片
image.png Kafka服务端之KafkaController
文章图片
image.png deleteTopicStopReplicaCallback()回调函数中会调用failReplicaDeletion()方法处理异常副本,调用completeReplicaDeletion()方法处理返回正常StopReplicaResponse的副本。
Kafka服务端之KafkaController
文章图片
image.png 回到doWork()方法继续分析,在步骤4中调用completeDeleteTopic()方法对成功删除的Topic进行处理。

Kafka服务端之KafkaController
文章图片
image.png 在步骤6中调用markTopicForDeletionRetry()方法处理不可删除的Topic,它会将处于ReplicaDeletionIneligible状态的副本重新转换成OfflineReplica状态。
Kafka服务端之KafkaController
文章图片
image.png 在前面我们介绍了三种Topic不可删除的情况,在DeleteTopicsThread线程的执行过程中也有涉及。当Topic不再满足这三种情况时会通过resumeDeletionForTopics()方法从topicsIneligibleForDeletion集合中将其移除,并唤醒DeleteTopicsThread线程进行上述删除操作。图4-64是其被调用的位置,依次对应本节开始描述的三种情况。
Kafka服务端之KafkaController
文章图片
image.png 介绍完TopicDeletionManager的相关实现后再来对DeleteTopicsListener进行分析。DeleteTopicsListener会监听ZooKeeper中“/admin/delete_topics”节点下的子节点变化,当TopicCommand在该路径下添加需要被删除的Topic时,DeleteTopicsListener会被触发,它会将该待删除的Topic交由TopicDeletionManager执行Topic删除操作。下面是DeleteTopicsListener.handleChildChange()方法的具体实现:
Kafka服务端之KafkaController
文章图片
image.png Kafka服务端之KafkaController
文章图片
image.png PartitionModificationsListener
在上一节介绍的Topic删除过程中涉及PartitionModificationsListener的注册和取消。在新增Topic时会为每个Topic注册一个PartitionModificationsListener,在成功删除Topic之后会将注册的PartitionModificationsListener删除。PartitionModificationsListener会监听“/brokers/topics/[topic_name]”节点中的数据变化,主要用于监听一个Topic的分区变化。


PartitionModificationsListener.handleDataChange()方法的实现如下:! Kafka服务端之KafkaController
文章图片
image.png onNewPartitionCreation()方法在TopicChangeListener中分析过,此处不再赘述。需要注意,PartitionModificationsListener并不对分区的删除进行处理,在第5章中分析kafkatopics脚本时可以看到,是不能减少Topic的分区数量的。
BrokerChangeListener
BrokerChangeListener是ReplicaStateMachine中唯一的ZooKeeper Listener,它会监听“/brokers/ids”节点下的子节点变化,主要负责处理Broker的上线和故障下线。当Broker上线时会在“/brokers/ids”下创建临时节点,下线时会删除对应的临时节点。
BrokerChangeListener.handleChildChange()方法的实现如下:

Kafka服务端之KafkaController
文章图片
image.png KafkaController.onBrokerFailure()方法对故障Broker的处理步骤如下:
(1)将Leader副本分布在故障Broker上的分区转换为OfflinePartition状态。
(2)将OfflinePartition状态的分区转换为OnlinePartition状态。此步会使用OfflinePartitionLeaderSelector为其选取Leader副本和ISR集合并写入ZooKeeper,之后发送LeaderAndIsrRequest和UpdateMetadataRequest。
3)将故障Broker上的副本转换为OfflineReplica状态。此步会向故障Broker发送StopReplicaRequest,从ISR集合中清除相关副本,并发送LeaderAndIsrRequest和UpdateMetadataRequest。
(4)检查故障Broker上是否存在待删除Topic的副本,如果存在,则将其副本转换为ReplicaDeletionIneligible状态并标记Topic不可删除。
(5)如果步骤1中没有分区的Leader副本在故障Broker上,则上述步骤中可能不会发送UpdateMetadataRequest,这里向可用Broker发送UpdateMetadataRequest。
KafkaController.onBrokerFailure()方法的具体实现如下:
Kafka服务端之KafkaController
文章图片
image.png Kafka服务端之KafkaController
文章图片
image.png 这里举例说明onBrokerFailure()方法的功能:现在假设Broker0上分布了Partition0、1、2的各一个副本,其中Partition0的副本为Leader副本,其余两个副本在对应的ISR集合中。当Broker0发生故障下线时,ZooKeeper中的“/brokers/ids/0”临时节点会被删除,并触发BrokerChangeListener进行处理。首先,将Broker0从ControllerContext的Broker列表中删除。然后,将Partition0转换为OfflinePartition状态,紧接着再将其转换成OnlinePartition状态,此时会使用OfflinePartitionLeaderSelector为其选举新的Leader副本和ISR集合并更新到ZooKeeper中,随后发送LeaderAndIsrRequest和UpdateMetaRequest。之后,将三个副本转换成OfflineReplica,并将其从ISR集合删除,此时会发送StopReplicaRequest(不删除副本)、LeaderAndIsrRequest和UpdateMetaRequest更新可用Broker的MetadataCache。
KafkaController.onBrokerStartup()方法的实现如下:

Kafka服务端之KafkaController
文章图片
image.png Kafka服务端之KafkaController
文章图片
image.png 继续上面的示例,当Broker0重新上线后会创建临时节点“/brokers/ids/0”,触发BrokerChangeListener处理。首先同样是更新ControllerContext的Broker列表。然后将三个副本转换成OnlineReplica状态,此时每个分区都已经有了Leader副本和ISR集合信息,所以向对应副本发送LeaderAndIsrRequest使其成为Follower,并向可用Broker发送UpdateMetadataRequest更新MetadataCache信息。此示例没有Partition处于OfflinePartition状态,也没有需要进行重新分配的Partition,所以后续步骤没有执行。
IsrChangeNotificationListener
在前面介绍过,Follower副本会与Leader副本会进行消息同步,当Follower副本追上Leader副本时会被添加到ISR集合中,当Follower副本与Leader副本差距太大时会被踢出ISR集合。Leader副本不仅会在ISR集合变化时将其记录到ZooKeeper中,还会调用ReplicaManager.recordIsrChange()方法记录到isrChangeSet集合中,之后通过isr-changepropagation定时任务将该集合中周期性地写入到ZooKeeper的“/isr_change_notification”路径下。KafkaController中定义的IsrChangeNotificationListener用于监听此路径下的子节点变化,当某些分区的ISR集合变化时通知整个集群中的所有Broker。
IsrChangeNotificationListener.handleChildChange()方法的具体实现如下:
Kafka服务端之KafkaController
文章图片
image.png PreferredReplicaElectionListener
PreferredReplicaElectionListener负责监听的ZooKeeper节点是“/admin/preferred_replica_election”。当我们通过PreferredReplicaLeaderElectionCommand命令指定某些分区需要进行“优先副本”选举时会将指定分区的信息写入该节点,从而触发PreferredReplicaElectionListener进行处理。进行“优先副本”选举的目的是让分区的“优先副本”重新成为Leader副本,这是为了让Leader副本在整个集群中分布得更加均衡。
PreferredReplicaElectionListener.handleDataChange()方法的具体实现如下:

Kafka服务端之KafkaController
文章图片
image.png onPreferredReplicaElection()方法的核心是通过PreferredReplicaPartitionLeaderSelector选举Leader副本和ISR集合。PreferredReplicaPartitionLeaderSelector选举成功的条件是“优先副本”不是当前Leader副本,但是要在ISR集合中,否则会抛出异常。经过选举后的ISR集合还是当前的ISR集合,但分区的Leader副本变成了“优先副本”。
Kafka服务端之KafkaController
文章图片
image.png onPreferredReplicaElection()方法还会被一个名为"partition-rebalance"的定时任务调用,此任务会定期检测集群中“优先副本”与Leader副本的分配情况,并判断是否触发“优先副本”选举,后面会详细介绍该定时任务。
副本重新分配的相关Listener PartitionsReassignedListener监听的ZooKeeper节点是“/admin/reassign_partitions”。当管理人员通过ReassignPartitionsCommand命令指定某些分区需要重新分配副本时,会将指定分区的信息写入该节点,从而触发PartitionsReassignedListener进行处理。
下面是整个副本重新分配的步骤:
(1)从ZooKeeper的“/admin/reassign_partitions”节点下读取分区重分配信息。
(2)过滤掉正在进行重新分配的分区。
(3)检测其Topic是否为待删除的Topic。如果是,则调用KafkaController. removePartitionFromReassignedPartitions()方法,其操作如下:
a)取消此分区注册的ReassignedPartitionsIsrChangeListener。
b)删除ZooKeeper的“/admin/reassign_partitions”节点中与当前分区相关的数据。
c)从partitionsBeingReassigned集合中删除分区相关的数据。
(4)否则,创建ReassignedPartitionsContext,调用initiateReassignReplicasForTopicPa rtition()方法开始为重新分配副本做一些准备工作。
a)首先,获取当前的旧AR集合和指定的新AR集合。
b)比较新旧两个AR集合,若两者完全一样则抛出异常,执行步骤3的操作后结束。
c)判断新AR集合中涉及的Broker是否都是可用的,若不是,也抛出异常,执行步骤3的操作后结束。
d)为分区添注册ReassignedPartitionsIsrChangeListener,后面会详细介绍此Listener。
e)将分区添加到partitionsBeingReassigned集合中,并标识该Topic不能被删除。
f)调用onPartitionReassignment()方法,开始执行副本重新分配。
5)onPartitionReassignment()方法完成了副本重新分配的整个流程。在下面的描述中使用“新AR+旧AR”表示新AR集合和旧AR集合的并集,“新AR-旧AR”表示新AR集合与旧AR集合的差集。
(6)判断新AR集合中的所有副本是否已经进入了ISR集合。如果没有,则执行下面的步骤:
a)将分区在ContextController和ZooKeeper中的AR集合更新成“新AR+旧AR”。
b)向“新AR+旧AR”发送LeaderAndIsrRequest,此步骤主要目的是为了增加ZooKeeper中记录的leader_epoch值。
c)将“新AR-旧AR”中的副本更新成NewReplica状态,此步骤会向这些副本发送LeaderAndIsrRequest使其成为Follower副本,并发送UpdateMetadataRequest。
(7)如果新AR集合中的副本已经都进入了ISR集合,则执行下面的步骤:
a)将新AR集合中的所有副本都转换成OnlineReplica状态。
b)将ControllerContext中的AR记录更新为新AR集合。
c)如果当前Leader副本在新AR集合中,则递增ZooKeeper和ControllerContext中记录的leader_epoch值,并发送LeaderAndIsrRequest和UpdateMetadataRequest。
d)如果当前Leader不在新AR集合中或Leader副本不可用,则将分区状态转换为OnlinePartition(之前也是OnlinePartition),主要目的使用ReassignedPartitionLeaderSelec tor选举新的Leader副本,使得新AR集合中的一个副本成为新Leader副本,然后会发送LeaderAndIsrRequest和UpdateMetadataRequest。
e)将“旧AR-新AR”中的副本转换成OfflineReplica,此步骤会发送StopReplicaRequest(不删除副本),清理ISR集合中的相关副本,并发送LeaderAndISRRequest和UpdateMetadataRequest请求。
f)接着将“旧AR-新AR”中的副本转换成ReplicaDeletionStarted,此步骤会发送StopReplicaRequest(删除副本)。完成删除后,将副本转换为ReplicaDeletionSuccessful,最终转换成NonExistentReplica。
g)更新ZooKeeper中记录的AR信息。
h)将此分区的相关信息从ZooKeeper的“/admin/reassign_partitions”节点中移除。
i)向所有可用的Broker发送一次UpdateMetadataRequest。
j)尝试取消相关的Topic的“不可删除”标记,并唤醒DeleteTopicsThread线程。
上面描述的整个流程其实还涉及ReassignedPartitionsIsrChangeListener的相关内容,其中,步骤7就是在ReassignedPartitionsIsrChangeListener中完成的。
ReassignedPartitionsIsrChangeListener在上述步骤(4)→d中注册到ZooKeeper的“/broker/topics/[topic_name]/partitions/[partitionId]/state”节点上监听其数据变化,主要负责处理进行副本重新分配的分区的ISR集合变化。当ReassignedPartitionsIsrChangeListener监听到分区的ISR集合发生变化时,按照下列步骤进行处理:
(1)检查当前分区是否正在进行副本的重新分配操作,若不是,则结束。
(2)从ZooKeeper中读取当前分区的Leader和ISR集合。
(3)如果新AR集合中的副本已完全进入当前ISR集合,则调用onPartitionReassignment()方法完成步骤7的相关操作。
(4)否则,输出日志后结束,等待下一次触发。
为了让读者更好地理解副本重新分配的过程,通过一个示例描述这个过程:现在假设有Broker05这六个Broker,Topic名为“test”的编号为0的分区(Partition0)三个副本,分别位于Broker13上,其中Leader副本位于Broker1上。
管理人员使用ReassignPartitionsCommand命令将Partition0的副本重新分配到Broker3~5上。PartitionsReassignedListener被触发后首先为Partition0注册ReassignedP artitionsIsrChangeListener,并标记test这个Topic不能被删除。然后,将ZooKeeper和ControllerContext中Partition0的AR集合记录更新为[1,2,3,4,5]。发送LeaderAndIsrRequest使[4,5]副本成为Follower副本,并发送UpdateMetadataRequest更新可用Broker的MetadataCache。到这里PartitionsReassignedListener的相关处理就结束了。
随着[4,5]两个Follower副本与Leader副本进行同步,最终会进入ISR集合,此时会触发ReassignedPartitionsIsrChangeListener进行处理。首先将[3,4,5]副本转换为OnlineReplica状态,更新ControllerContext中对应的AR记录为[3,4,5]。之后,[3,4,5]副本已经处于ISR集合,且Leader副本不在[3,4,5]中,则需要使用ReassignedPartitionLead erSelector选举新的Leader。我们简单回顾ReassignedPartitionLeaderSelector的策略,它选取的新Leader副本必须在新指定的AR集合中且在ISR集合中,当前ISR集合为新ISR集合。接收LeaderAndIsrRequest的副本是新指定的AR集合。这里假设选举副本3为Leader副本,并向[3,4,5]三个副本发送LeaderAndIsrRequest请求。然后,将[1,2]副本转换为OfflineReplica,此时发送StopReplicaRequest停止副本,将其从ISR中移除,最终将这两个副本删除转换为NonExistentReplica状态。最后,更新ZooKeeper中的AR记录,标记取消“test”的“不可删除”标记,并唤醒DeleteTopicsThread线程。副本重新分配到此全部完成,此时AR集合和ISR集合都为[3,4,5],Leader为副本3。
PartitionsReassignedListener.handleDataChange()方法的具体实现如下:
Kafka服务端之KafkaController
文章图片
image.png KafkaController.initiateReassignReplicasForTopicPartition()方法的实现如下:

Kafka服务端之KafkaController
文章图片
image.png KafkaController.onPartitionReassignment()是副本重新分配的核心,其具体实现如下:

Kafka服务端之KafkaController
文章图片
image.png Kafka服务端之KafkaController
文章图片
image.png ReassignedPartitionsIsrChangeListener的核心逻辑也是调用onPartitionReassignment()方法,这里不再赘述了。
KafkaController初始化与故障转移
KafkaController的主要功能和实现在前面几节中已经介绍完了。本节介绍KafkaController的初始化以及故障转移方面的内容。在Kafka集群中,只有一个Controller能够成为Leader来管理整个集群,而其他未成为Controller Leader的Broker上也会创建一个KafkaController对象,它们唯一能做的事情就是当Controller Leader出现故障,不能继续管理集群时,竞争成为新的Controller Leader。
KafkaController的启动和故障转移的过程与ZookeeperLeaderElector有着密切的关系,KafkaController. controllerElector字段ZookeeperLeaderElector类型的定义如下:
Kafka服务端之KafkaController
文章图片
image.png KafkaController启动的过程由KafkaController.startup()方法完成,其中会注册SessionExpirationListener,并启动ZookeeperLeaderElector。
Kafka服务端之KafkaController
文章图片
image.png SessionExpirationListener继承了IZkStateListener接口,监听KafkaController与ZooKeeper的连接状态。当KafkaController与ZooKeeper的连接超时后创建新连接时会触发SessionExpirationListener.handleNewSession()方法。
Kafka服务端之KafkaController
文章图片
image.png KafkaController的启动逻辑委托给了ZooKeeperLeaderElector.startup()方法完成。在ZookeeperLeaderElector中有两个比较重要的字段:
  • ·leaderId:缓存当前的Controller LeaderId。
  • ·leaderChangeListener:LeaderChangeListener会监听Zookeeper的“/controller”节点的数据变化,当此节点中保存的LeaderId发生变化时,会触发LeaderChangeListener进行相应的处理。
Kafka服务端之KafkaController
文章图片
image.png 当“/controller”节点中的数据被删除时会触发handleDataDeleted()方法进行处理。
Kafka服务端之KafkaController
文章图片
image.png ZookeeperLeaderElector.startup()方法的逻辑是注册LeaderChangeListener后,立即调用elect()方法尝试进行Controller Leader的选举。
Kafka服务端之KafkaController
文章图片
image.png KafkaController触发选举的地方有三处,如图4-65所示。依次是:第一次启动的时候;LeaderChangeListener监听到“/controller”节点中数据被删除;ZooKeeper连接过期并重新建立之后。

Kafka服务端之KafkaController
文章图片
image.png ZookeeperLeaderElector.elect()方法的具体实现如下:

Kafka服务端之KafkaController
文章图片
image.png onControllerFailover
ZookeeperLeaderElector.elect()方法中调用的回调函数onBecomingLeader()实际上是ZookeeperLeaderElector构造函数中传入的KafkaController.onControllerFailover()方法。当前Broker成功选举为Controller Leader时会通过该方法完成一系列的初始化操作,其具体步骤如下
Kafka服务端之KafkaController
文章图片
image.png Kafka服务端之KafkaController
文章图片
image.png 此过程的组件我们在前面已经详细分析过了,这里我们关注一下initializeControllerContext()方法,了解ControllerContext到底从ZooKeeper中读取了哪些数据。

Kafka服务端之KafkaController
文章图片
image.png Partition Rebalance
在KafkaController.onControllerFailover()方法中会启动一个名为“partition-rebalance”的周期性的定时任务,它提供了分区的自动均衡功能。该定时任务会周期性地调用KafkaController.checkAndTriggerPartitionRebalance()方法对失衡的Broker上相关的分区进行“优先副本”选举,使得相关分区的“优先副本”重新成为Leader副本,整个集群中Leader副本的分布也会重新恢复平衡
checkAndTriggerPartitionRebalance()方法首先获取“优先副本”所在的BrokerId与分区的对应关系,然后利用此对应关系,计算每个“优先副本”所在的Broker的“imbalance”比率,该值是当前Leader副本为非“优先副本”的分区与此Broker上分区总数的比值,当“imbalance”比率大于一定阈值,则触发“优先副本”选举。
为了读者更好地理解含义,现举例说明。假设集群中有0~2三个Broker,某Topic有15个分区,Leader副本即为“优先副本”,它们的分布如表4-6所示。
Kafka服务端之KafkaController
文章图片
image.png 当Kafka集群运行一段时间,期间某些Broker可能出现过宕机,导致Leader副本发生迁移,现在Leader副本的分布如表4-7所示。
Kafka服务端之KafkaController
文章图片
image.png 此时的Leader副本分布已经明显不均匀了,Broker2上分布了5个分区的副本,但当前有4个分区并不是以“优先副本”为Leader副本,Broker2的“imbalance”比率为4/5=80%,默认的阈值为10%,此时会触发对Partition5、Partition8、Partition11、Partition14的“优先副本”选举。
KafkaController.checkAndTriggerPartitionRebalance()方法的实现如下
Kafka服务端之KafkaController
文章图片
image.png Kafka服务端之KafkaController
文章图片
image.png KafkaController成为Controller Leader后进行的一系列操作的具体流程和实现到这里就介绍完了。下面我们来分析KafkaController如何从Controller Leader变成Follower。
onControllerResignation
通过前面对LeaderChangeListener源码的分析我们知道,当它监听到“/controller”中的数据被删除或改变时,旧的Controller Leader需要调用onResigningAsLeader()回调函数进行一些清理工作,它实际是ZookeeperLeaderElector构造函数中传入的KafkaController.onControllerResignation()方法,其实现如下:

Kafka服务端之KafkaController
文章图片
image.png
处理ControlledShutdownRequest
在前面介绍的BrokerChangeListener可以根据ZooKeeper中“/brokers/ids”的子节点变化,处理Broker故障宕机的场景。在有些场景中用户希望主动关闭正常运行的Broker,例如,更换硬件、操作系统升级、修改Kafka的配置等。如果依然使用上述方式关闭Broker,就略显粗暴。在Kafka中提供了Controlled Shutdown的方式来关闭一个Broker实例。
使用Controlled Shutdown的方式主动下线Broker有两个好处:一是可以让日志文件完全同步到磁盘上,在Broker下次重现上线时不需要进行Log的恢复操作;二是Controller Shutdown方式在关闭Broker之前,会对其上的Leader副本进行迁移,这样就可以减少分区的不可用时间。
在Kafka之前的版本中需要使用命令行工具向Controller Leader发送ControlledShutdownRequest请求。在本书分析的版本中,已经将该过程写成了JVM的关闭钩子。在Kafka.main()方法中可以看到这段代码:
Kafka服务端之KafkaController
文章图片
image.png KafkaController.shutdownBroker()方法是ControlledShutdownRequest的核心,该方法会使用ControlledShutdownLeaderSelector重新选择Leader副本和ISR集合,实现Leader副本的迁移。回顾一下ControlledShutdownLeaderSelector的策略:从当前ISR集合中排除正在关闭的副本后的剩余副本作为新ISR集合,从新ISR集合中选择新的Leader,需要向可用的AR发送LeaderAndIsrRequest。shutdownBroker()方法的代码如下所示。
Kafka服务端之KafkaController
文章图片
image.png Kafka服务端之KafkaController
文章图片
image.png 【Kafka服务端之KafkaController】在本节中我们介绍了ContextController如何管理集群相关的各种信息,包括Broker、Topic、分区、副本等元数据,以及读取、更新ZooKeeper的时机。了解了ControllerChannelManager和ControllerBrokerRequestBatch如何帮助KafkaController完成与集群中其他的Broker的通信,以及LeaderAndIsrRequest和UpdateMetadataRequest等请求的格式。介绍了管理分区状态的PartitionStateMachine组件和管理副本状态的ReplicaStateMachine,以及在不同场景下使用的PartitionLeaderSelector策略。通过注册不同多种不同类型的ZooKeeper Listener,可以实现多种管理功能,例如:Topic增删的相关处理、Broker故障下线的处理、分区的副本重新分配、“优先副本”选举,等等。我们还介绍了一个Broker如何成为Controller Leader,成为Controller Leader后的初始化流程以及放弃Controller Leader角色后的清理操作。最后,又分析了Controlled Shutdown方式下一个Broker的具体实现。希望读者通过本节的阅读,能够对KafkaController的功能和实现有清晰的了解。

    推荐阅读