【Kafka源码】broker被选为controller之后的连锁反应

[TOC]
今天我们主要分析下broker被选为controller之后,主要干了什么。门面代码先列出来:

def onControllerFailover() { if (isRunning) { info("Broker %d starting become controller state transition".format(config.brokerId)) //read controller epoch from zk readControllerEpochFromZookeeper() // increment the controller epoch incrementControllerEpoch(zkUtils.zkClient) // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks registerReassignedPartitionsListener() registerIsrChangeNotificationListener() registerPreferredReplicaElectionListener() partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() initializeControllerContext() replicaStateMachine.startup() partitionStateMachine.startup() // register the partition change listeners for all existing topics on failover controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) brokerState.newState(RunningAsController) maybeTriggerPartitionReassignment() maybeTriggerPreferredReplicaElection() /* send partition leadership info to all live brokers */ sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) if (config.autoLeaderRebalanceEnable) { info("starting the partition rebalance scheduler") autoRebalanceScheduler.startup() autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, 5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS) } deleteTopicManager.start() } else info("Controller has been shut down, aborting startup/failover") }

一个门面,涉及到的监听器和其他内容比较多,我们一一分析。
一、controller epoch 首先从zk的节点/controller_epoch下获取之前的epoch,然后将其+1后持久化到zk中。
二、注册监听器 这块就是订阅zk的节点信息,如果节点信息有变化,会做出一些操作。
2.1 registerReassignedPartitionsListener
private def registerReassignedPartitionsListener() = { zkUtils.zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) }

这块订阅的路径是:/admin/reassign_partitions,表示的是分区的重新分配。如果有变化,会有下面的操作:
/** * Starts the partition reassignment process unless - * 1. Partition previously existed * 2. New replicas are the same as existing replicas * 3. Any replica in the new set of replicas are dead * If any of the above conditions are satisfied, it logs an error and removes the partition from list of reassigned * partitions. */ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging { this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: " val zkUtils = controller.controllerContext.zkUtils val controllerContext = controller.controllerContext/** * Invoked when some partitions are reassigned by the admin command * @throws Exception On any error. */ @throws(classOf[Exception]) def handleDataChange(dataPath: String, data: Object) { debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s" .format(dataPath, data)) //解析zk节点上的数据 val partitionsReassignmentData = https://www.it610.com/article/zkUtils.parsePartitionReassignmentData(data.toString) //获取需要重新分配的分区列表 val partitionsToBeReassigned = inLock(controllerContext.controllerLock) { partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1)) } partitionsToBeReassigned.foreach { partitionToBeReassigned => inLock(controllerContext.controllerLock) { //首先判断topic是否正在等待被删除,如果是,就把相关的分区从列表中删除 if (controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) { error("Skipping reassignment of partition %s for topic %s since it is currently being deleted" .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic)) controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1) } else {//进行重分配 val context = new ReassignedPartitionsContext(partitionToBeReassigned._2) controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context) } } } }/** * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader * * @throws Exception * On any error. */ @throws(classOf[Exception]) def handleDataDeleted(dataPath: String) { } }

下面我们具体看下重新分配的过程,也就是initiateReassignReplicasForTopicPartition里面做了什么。
2.1.1 initiateReassignReplicasForTopicPartition
def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,reassignedPartitionContext: ReassignedPartitionsContext) { val newReplicas = reassignedPartitionContext.newReplicas val topic = topicAndPartition.topic val partition = topicAndPartition.partition //获取存活的replica val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) try { val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition) assignedReplicasOpt match { case Some(assignedReplicas) => if (assignedReplicas == newReplicas) { throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) + " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(","))) } else { if (aliveNewReplicas == newReplicas) { info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(","))) // first register ISR change listener 监听ISR变化 watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext) controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext) // mark topic ineligible for deletion for the partitions being reassigned deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) onPartitionReassignment(topicAndPartition, reassignedPartitionContext) } else { // some replica in RAR is not alive. Fail partition reassignment throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) + " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) + "Failing partition reassignment") } } case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist" .format(topicAndPartition)) } } catch { case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e) // remove the partition from the admin path to unblock the admin client removePartitionFromReassignedPartitions(topicAndPartition) } }

我们分析到代码watchIsrChangesForReassignedPartition时,发现里面定义的数据监听之后,其实也是调用了onPartitionReassignment,所以我们之间看下onPartitionReassignment,这是重新分配的重点。
2.1.2 onPartitionReassignment
这个方法由重新分区监听器触发,当admin触发时,它首先创建/admin/reassign_partitions路径,以触发zk监听器。分区重新分配会经历下面几步:
RAR = Reassigned replicas 重新分配的副本
OAR = Original list of replicas for partition,分区最初的副本列表
AR = current assigned replicas:当前分配的副本
  • 1、通过OAR + RAR更新zk中的AR
  • 2、发送LeaderAndIsr请求给AR中的每个副本,我们这样做的目的是强制更新zk中的controller epoch。
  • 3、将RAR-OAR中副本状态变为新副本状态NewReplica,启动新副本
  • 4、等待RAR中所有副本与leader同步
  • 5、将RAR中所有的副本设置为OnlineReplica状态
  • 6、设置AR到RAR的内存中
  • 7、如果leader不在RAR中,从RAR中选举一个leader。如果需要选举,需要发送LeaderAndIsr请求。如果不是,那么controller epoch会自增,然后发送LeaderAndIsr请求。在任何情况下,都要保证AR=RAR。防止出现leader把RAR-OAR中的副本加到isr中。
  • 8、把OAR-RAR中的副本设为OfflineReplica状态。当OfflineReplica状态变化时,我们会移除zk中ISR的OAR-RAR部分,然后发送LeaderAndIsr给leader,通知他ISR的缩减。然后,我们把OAR-RAR的副本状态改为StopReplica。
  • 9、将OAR-RAR中所有的副本状态改为StopReplica。这会物理删除这些副本。
  • 10、使用RAR更新ZK中的AR
  • 11、更新zk节点/admin/reassign_partitions,删除对应的分区
  • 12、选举完成后,副本和isr信息变化了。重新发送更新源数据的请求给每个broker。
整个过程比较绕,需要仔细理解下,下面是一个简单的过程,可以参考。
* For example, if OAR = {1, 2, 3} and RAR = {4,5,6}, the values in the assigned replica (AR) and leader/isr path in ZK * may go through the following transition. * ARleader/isr * {1,2,3}1/{1,2,3}(initial state) * {1,2,3,4,5,6}1/{1,2,3}(step 2) * {1,2,3,4,5,6}1/{1,2,3,4,5,6}(step 4) * {1,2,3,4,5,6}4/{1,2,3,4,5,6}(step 7) * {1,2,3,4,5,6}4/{4,5,6}(step 8) * {4,5,6}4/{4,5,6}(step 10) * * Note that we have to update AR in ZK with RAR last since it's the only place where we store OAR persistently. * This way, if the controller crashes before that step, we can still recover.

2.2 registerIsrChangeNotificationListener 注册路径/isr_change_notification监听器。
/** * Called when leader intimates of isr change * * @param controller */ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging {override def handleChildChange(parentPath: String, currentChildren: util.List[String]): Unit = { import scala.collection.JavaConverters._inLock(controller.controllerContext.controllerLock) { debug("[IsrChangeNotificationListener] Fired!!!") val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala try { val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.map(x => getTopicAndPartition(x)).flatten.toSet if (topicAndPartitions.nonEmpty) { controller.updateLeaderAndIsrCache(topicAndPartitions) processUpdateNotifications(topicAndPartitions) } } finally { // delete processed children childrenAsScala.map(x => controller.controllerContext.zkUtils.deletePath( ZkUtils.IsrChangeNotificationPath + "/" + x)) } } }

【【Kafka源码】broker被选为controller之后的连锁反应】主要是更新下leader和isr的缓存,主要是controller的epoch,然后发送更新源数据的请求。
2.3 registerPreferredReplicaElectionListener 监听/admin/preferred_replica_election路径的数据,preferred replica在leader挂掉的情况下,会直接被选为leader,也就是就是assigned replicas列表中的第一个replica。
三、分区和副本状态机 3.1 注册分区状态机监听器 首先是分区状态机,分区的状态有以下几个:
  • NonExistentPartition,分区不存在,他的前一个状态只能是OfflinePartition
  • NewPartition:新分区,还没有选出leader,前一个状态为NonExistentPartition
  • OnlinePartition:分区上线,leader已经选举出来了,前一个状态为NewPartition/OfflinePartition
  • OfflinePartition:分区下线,前一个状态为NewPartition/OnlinePartition
// register topic and partition change listeners def registerListeners() { registerTopicChangeListener() if(controller.config.deleteTopicEnable) registerDeleteTopicListener() }

监听/brokers/topics路径数据变化,如果允许删除topic的话,监听/admin/delete_topics路径数据变化。
下面我们看下两个监听背后的动作。
3.1.1 registerTopicChangeListener
这块主要处理了/brokers/topics路径下一些topic的变化,包括新增和删除的后续操作。
/** * This is the zookeeper listener that triggers all the state transitions for a partition */ class TopicChangeListener extends IZkChildListener with Logging { this.logIdent = "[TopicChangeListener on Controller " + controller.config.brokerId + "]: "@throws(classOf[Exception]) def handleChildChange(parentPath : String, children : java.util.List[String]) { inLock(controllerContext.controllerLock) { if (hasStarted.get) { try { val currentChildren = { import JavaConversions._ debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(","))) (children: Buffer[String]).toSet } val newTopics = currentChildren -- controllerContext.allTopics val deletedTopics = controllerContext.allTopics -- currentChildren controllerContext.allTopics = currentChildrenval addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq) controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p => !deletedTopics.contains(p._1.topic)) controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment) info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics, deletedTopics, addedPartitionReplicaAssignment)) if(newTopics.size > 0) controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet) } catch { case e: Throwable => error("Error while handling new topic", e ) } } } } }

3.1.2 registerDeleteTopicListener
监听zk节点,把需要删除的topic放到待删除队列中,然后由kafka执行删除,主要删除的是zk下面相关的节点,和日志文件。
3.2 注册副本状态机监听器 副本状态机,有以下几种状态:
  • NewReplica:controller在重新分区时会创建新副本,这个状态下,只能收到成为follower的请求,前一个状态是NonExistentReplica。
  • OnlineReplica:副本启动后的状态,这个状态下,他可以收到成为leader或follower的请求。前一个状态可以是NewReplica, OnlineReplica or OfflineReplica。
  • OfflineReplica:分区挂掉后的状态,前一个状态为NewReplica, OnlineReplica
  • ReplicaDeletionStarted:副本删除开始时的状态,前一个状态为OfflineReplica
  • ReplicaDeletionSuccessful:副本响应删除请求时没有错误码,这时候的状态,前一个状态为ReplicaDeletionStarted
  • ReplicaDeletionIneligible:副本删除失败的状态,前一个状态为ReplicaDeletionStarted
  • NonExistentReplica:副本删除成功后的状态,前一个状态为ReplicaDeletionSuccessful。
3.2.1 registerBrokerChangeListener
监听/brokers/ids路径下的节点变化。主要是broker是否有新增或者删除,然后做对应的操作。
/** * This is the zookeeper listener that triggers all the state transitions for a replica */ class BrokerChangeListener() extends IZkChildListener with Logging { this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: " def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) { info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(","))) inLock(controllerContext.controllerLock) { if (hasStarted.get) { ControllerStats.leaderElectionTimer.time { try { val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo) val curBrokerIds = curBrokers.map(_.id) val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id)) controllerContext.liveBrokers = curBrokers val newBrokerIdsSorted = newBrokerIds.toSeq.sorted val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s" .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(","))) newBrokers.foreach(controllerContext.controllerChannelManager.addBroker) deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker) if(newBrokerIds.size > 0) controller.onBrokerStartup(newBrokerIdsSorted) if(deadBrokerIds.size > 0) controller.onBrokerFailure(deadBrokerIdsSorted) } catch { case e: Throwable => error("Error while handling broker changes", e) } } } } } }

3.3 初始化controller上下文 这块主要获取了一些原始数据,包括topic、分区等等,然后启动了一些管理器。
private def initializeControllerContext() { // update controller cache with delete topic information //存活的brokerId列表 controllerContext.liveBrokers = zkUtils.getAllBrokersInCluster().toSet //所有的topic controllerContext.allTopics = zkUtils.getAllTopics().toSet //所有topic的分区信息 controllerContext.partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSeq) //分区的leader信息 controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch] //已经挂掉的broker列表,默认为空 controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int] // update the leader and isr cache for all existing partitions from Zookeeper updateLeaderAndIsrCache() // start the channel manager startChannelManager() initializePreferredReplicaElection() initializePartitionReassignment() initializeTopicDeletion() info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds)) info("Currently shutting brokers in the cluster: %s".format(controllerContext.shuttingDownBrokerIds)) info("Current list of topics in the cluster: %s".format(controllerContext.allTopics)) }

前面几行已经有了注释,也比较清楚,下面我们从startChannelManager开始。这个ChannelManager是什么?其实就是用于leader与各个broker通信的通道。这个manager也就是管理这些请求的管理器。
这里主要处理几种请求:
  • LEADER_AND_ISR
  • STOP_REPLICA
  • UPDATE_METADATA_KEY
这个通道启动完成后,就是初始化三个动作:
  • initializePreferredReplicaElection
  • initializePartitionReassignment
  • initializeTopicDeletion
3.4 副本状态机监听器启动 也就是replicaStateMachine.startup()。这个方法通过读取zk中的分区信息,把所有的副本状态改为OnlineReplica。
/** * Invoked on successful controller election. First registers a broker change listener since that triggers all * state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper. * Then triggers the OnlineReplica state change for all replicas. */ def startup() { // initialize replica state initializeReplicaState() // set started flag hasStarted.set(true) // move all Online replicas to Online handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica) info("Started replica state machine with initial state -> " + replicaState.toString()) }

3.5 分区状态机监听器启动 类似于副本状态机监听器,这个也是初始化了分区的状态,然后把分区的状态变为OnlineState。
/** * Invoked on successful controller election. First registers a topic change listener since that triggers all * state transitions for partitions. Initializes the state of partitions by reading from zookeeper. Then triggers * the OnlinePartition state change for all new or offline partitions. */ def startup() { // initialize partition state initializePartitionState() // set started flag hasStarted.set(true) // try to move partitions to online state triggerOnlinePartitionStateChange()info("Started partition state machine with initial state -> " + partitionState.toString()) }

3.6 自动负载定时器 如果开启了auto.leader.rebalance.enable参数,那么就会启动分区负载定时器。配置中可以设置leader.imbalance.check.interval.seconds参数,表示定时检查的时间间隔,单位为秒。
if (config.autoLeaderRebalanceEnable) { info("starting the partition rebalance scheduler") autoRebalanceScheduler.startup() autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, 5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS) }

我们可以着重看下checkAndTriggerPartitionRebalance方法。
private def checkAndTriggerPartitionRebalance(): Unit = { if (isActive()) { trace("checking need to trigger partition rebalance") // get all the active brokers var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null inLock(controllerContext.controllerLock) { preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy { case (topicAndPartition, assignedReplicas) => assignedReplicas.head } } debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers) // for each broker, check if a preferred replica election needs to be triggered preferredReplicasForTopicsByBrokers.foreach { case (leaderBroker, topicAndPartitionsForBroker) => { var imbalanceRatio: Double = 0 var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null inLock(controllerContext.controllerLock) { topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case (topicPartition, replicas) => { controllerContext.partitionLeadershipInfo.contains(topicPartition) && controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker } } debug("topics not in preferred replica " + topicsNotInPreferredReplica) val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio)) } // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions // that need to be on this broker if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { topicsNotInPreferredReplica.foreach { case (topicPartition, replicas) => { inLock(controllerContext.controllerLock) { // do this check only if the broker is live and there are no partitions being reassigned currently // and preferred replica election is not in progress if (controllerContext.liveBrokerIds.contains(leaderBroker) && controllerContext.partitionsBeingReassigned.size == 0 && controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 && !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) && controllerContext.allTopics.contains(topicPartition.topic)) { onPreferredReplicaElection(Set(topicPartition), true) } } } } } } } } } }

3.7 启动删除topic进程 如果允许程序自动删除topic的话(delete.topic.enable=true),那么就会启动这个进程。

    推荐阅读