Kafka|Kafka 源码学习(动态配置)
Kafka 动态配置实现
Kafka 的动态配置基于 Zookeeper 实现,本文主要梳理了 Kafka(version:2.8) 中动态配置的实现逻辑。
背景信息
在 Kafka 中,Zookeeper 客户端没有使用常见的客户端工具(如:Curator),而是直接基于原生的客户端实现了自己的 KafkaZkClient,将一些通用或特有的 Zookeeper 操作封装在内。因此,关于 Zookeeper 的使用及回调等逻辑也完全是独立实现的。另外,由于 Zookeeper 中一个节点下的 Watcher 顺序触发,如果同一个节点下有大量的 Watcher,将会产生性能瓶颈。下面将基于这些背景信息来介绍 Kafka 是如何基于 Zookeeper 实现高效的动态配置管理的。
Kafka 动态配置 Zookeeper 目录结构
在当前版本的 Kafka 中,动态配置类型有 5 种:topic, client, user, broker, ip。Kafka 动态配置的目录结构为: /config/entityType/entityName
,entityType 代表配置的类型,entityName 代表具体某个实体,比如:Topic 类型对应的实体就是具体某个 Topic,某个配置类型下所有实体的默认配置的 entityName 为 /config/topic/
的节点信息中,Topic AAA 的所有动态配置会放在 /config/topic/AAA
的节点信息中。
Listener 实现
下面介绍 Kafka 中 Zookeeper Listener 的实现。既然是基于 Zookeeper 实现,必然少不了 Zookeeper 的 Watcher 通知机制,但是,如背景信息中所说,在 Watcher 数量过多的情况下,会存在性能瓶颈。以 Topic 配置变更为例,在生产环境中,一个 Topic 的 Partition 数量可能多达上千,如果每个 Partition Leader 都去监听这个 Topic 配置信息,那么在一个 Kafka 集群内,仅监听 Topic 配置的 Watcher 就会有上万个甚至更多。Kafka 通过独立的通知机制来避免了这一问题,即:每次 AdminClient 进行配置变更时,会在 /config/changes/
目录下创建以 config_change_
为前缀的顺序节点,Wather 只监听 /config/changes/
目录的孩子节点变化,所以对于动态配置来说,所有 Broker 只监听 /config/changes/
这一个目录,大大减少集群整体的 Watcher 数量。
Kafka 中动态配置的 Zookeeper Listener 的实现在 ZkNodeChangeNotificationListener
类中,该类监听指定目录下的顺序节点添加动作,在收到子节点变化通知后,ZkNodeChangeNotificationListener
一方面执行通知动作,通知对应的 Handler 处理配置变更,另一面会清除所有已经处理过的配置变更。
下面对 ZkNodeChangeNotificationListener
类的实现进行介绍,主要分为以下几个部分:
a. 初始化:注册 zk 连接状态变更的 Handler 和 zk 子节点变更的 Handler;调用一次 `addChangeNotification()` 触发一次配置变更的处理,用来初始化动态配置;启动用来处理配置变更事件的线程 `ChangeEventProcessThread`。
b. 配置变更处理:每次 zk 状态变更或者动态配置变更都会向 queue 中放入一个处理事件,与此同时,`ChangeEventProcessThread` 会持续不断的从 queue 中取出事件,执行对应的处理动作,即:`processNotifications()`。
c. 清除过期通知:每次执行完 `processNotifications()`,都会调用 `purgeObsoleteNotifications` 执行过期通知的清理动作,删除所有进行本次 `processNotifications()` 之前创建的所有变更通知。
class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
private val seqNodeRoot: String,
private val seqNodePrefix: String,
private val notificationHandler: NotificationHandler,
private val changeExpirationMs: Long = 15 * 60 * 1000,
private val time: Time = Time.SYSTEM) extends Logging {
private var lastExecutedChange = -1L
private val queue = new LinkedBlockingQueue[ChangeNotification]
private val thread = new ChangeEventProcessThread(s"$seqNodeRoot-event-process-thread")
private val isClosed = new AtomicBoolean(false)def init(): Unit = {
// ZkStateChangeHandler 和 ChangeNotificationHandler 都是 addChangeNotification()
zkClient.registerStateChangeHandler(ZkStateChangeHandler)
zkClient.registerZNodeChildChangeHandler(ChangeNotificationHandler)
addChangeNotification()
thread.start()
}def close() = {
···
}/**
* Process notifications
*/
private def processNotifications(): Unit = {
try {
val notifications = zkClient.getChildren(seqNodeRoot).sorted
if (notifications.nonEmpty) {
info(s"Processing notification(s) to $seqNodeRoot")
val now = time.milliseconds
for (notification <- notifications) {
val changeId = changeNumber(notification)
// 只处理更新的变更信息
if (changeId > lastExecutedChange) {
// 调用 notificationHandler.processNotification() 进行配置变更的处理
processNotification(notification)
lastExecutedChange = changeId
}
}
purgeObsoleteNotifications(now, notifications)
}
} catch {
···
}
}···
···private def addChangeNotification(): Unit = {
if (!isClosed.get && queue.peek() == null)
queue.put(new ChangeNotification)
}class ChangeNotification {
def process(): Unit = processNotifications()
}private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]): Unit = {
for (notification <- notifications.sorted) {
val notificationNode = seqNodeRoot + "/" + notification
val (data, stat) = zkClient.getDataAndStat(notificationNode)
if (data.isDefined) {
if (now - stat.getCtime > changeExpirationMs) {
debug(s"Purging change notification $notificationNode")
zkClient.deletePath(notificationNode)
}
}
}
}/* get the change number from a change notification znode */
private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLongclass ChangeEventProcessThread(name: String) extends ShutdownableThread(name = name) {
override def doWork(): Unit = queue.take().process()
}
···
···
}
Handler 实现
上面介绍了配置变更通知是如何接收的,实际的处理在 NotificationHandler.processNotification()
中进行,对于动态配置来说, NotificationHandler
接口的实现是类 ConfigChangedNotificationHandler
, ConfigChangedNotificationHandler
的 processNotification
会根据配置变更通知版本对配置变更通知内容进行解析,然后调用对应的类型的 ConfigHandler
进行配置更新。
下面对
{"version" : 1, "entity_type":"topic/client", "entity_name" : "
, version2 的内容为 {"version" : 2, "entity_path":"topic/
,当 entity_name 为
,表示所有 entity 的默认配置,例如: /config/topics/
中的配置表示所有 topic 的默认动态配置。ConfigHandler
的实现类进行简单介绍:
其它补充
【Kafka|Kafka 源码学习(动态配置)】在 broker 初始化 partition 的过程中,该 topic 的配置可能会发生变化,为了避免漏掉这部分配置的更新,会在
主要处理 3 类配置:
a. LogManager
中管理的 topic 配置
b. 副本限流配置
c. controller 中动态开关配置 "unclean.leader.election.enable"
都继承自 QuotaConfigHandler
,用来更新客户端侧的限流配置,ClientIdConfigHandler
负责 client id 维度的限流配置更新, UserConfigHandler
用来负责用户维度的限流配置更新
负责连接维度 ConnectionQuotas
的限流配置更新
一方面负责 broker 相关的 quota 配置,另一方面负责 broker 动态配置的更新。broker 的动态配置逻辑在类 DynamicBrokerConfig
中实现,主要逻辑是根据以下优先级顺序进行 broker 配置的更新和覆盖:
a. DYNAMIC_BROKER_CONFIG:存储在 ZK 中的 /configs/brokers/{brokerId}
b. DYNAMIC_DEFAULT_BROKER_CONFIG:存储在 ZK 中的 /configs/brokers/
c. STATIC_BROKER_CONFIG:broker 启动配置,通常来自 server.properties 文件
d. DEFAULT_CONFIG:KafkaConfig
中硬编码的默认配置createLog
过程中记录配置变更的情况,在 createLog
结束后处理这部分配置的更新, 具体可以参考:https://issues.apache.org/jir...
推荐阅读
- 机器学习笔记(二)1
- 在线学习培训系统2022版|在线学习培训系统2022版|视频点播|在线题库|在线考试
- Arrays.toString() 的用法
- python机器学习sklearn实现识别数字
- 机器学习|pyhton 机器学习 sklearn——手把手教你预测心脏病
- python|python机器学习基础01——sklearn开启
- python|python机器学习基础05——sklearn之逻辑回归+分类评价指标
- 机器学习|机器学习—KNN算法
- Python|机器学习1——手写数字识别
- Nacos配置中心集群原理及源码分析