kafka源码分析之kafkaserver的健康状态管理
KafkaHealthcheck 这个组件主要用于处理kafka server启动后与zk的通信,当kafka启动后,会向zk注册一个短命的节点,kafkaController会接收到这个kafka broker的信息,并监听broker的加入与销毁。
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils)
kafkaHealthcheck.startup()
实例生成时:
这个组件主要用于在/brokers/ids/brokerid下注册短命节点。
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
用于监听节点过期。
val sessionExpireListener = new SessionExpireListener
执行startup操作的流程:
def startup() {
向zk中注册监听器,用于监听session过期。
zkUtils.zkClient.subscribeStateChanges(sessionExpireListener)
register()
}
/**
* Register this broker as "alive" in zookeeper
*/
def register() {
val jmxPort = System.getProperty("com.sun.management.jmxremote.port",
"-1").toInt
val updatedEndpoints = advertisedEndpoints.mapValues(endpoint =>
if (endpoint.host == null || endpoint.host.trim.isEmpty)
EndPoint(InetAddress.getLocalHost.getCanonicalHostName, endpoint.port,
endpoint.protocolType)
else
endpoint
)
【kafka源码分析之kafkaserver的健康状态管理】
向zk中注册这个broker的连接信息,包含节点的ip,协议,端口,jmx端口。
在kafkaController中,通过BrokerChangeListener监听来对broker的下线与下线进行处理。
val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT,
new EndPoint(null,-1,null))
zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host,
plaintextEndpoint.port, updatedEndpoints, jmxPort)
}
关于session的监听实例SessionExpireListener。
在zk的session被创建成功时,通过register向broker对应的zk节点中注册这个节点的连接信息。
def handleNewSession() {
info("re-registering broker info in ZK for broker " + brokerId)
register()
info("done re-registering broker")
info("Subscribing to %s path to watch for new topics"
.format(ZkUtils.BrokerTopicsPath))
}
推荐阅读
- 如何寻找情感问答App的分析切入点
- D13|D13 张贇 Banner分析
- 自媒体形势分析
- 2020-12(完成事项)
- Android事件传递源码分析
- Python数据分析(一)(Matplotlib使用)
- 深入浅出谈一下有关分布式消息技术(Kafka)
- Quartz|Quartz 源码解析(四) —— QuartzScheduler和Listener事件监听
- 泽宇读书会——如何阅读一本书笔记
- Java内存泄漏分析系列之二(jstack生成的Thread|Java内存泄漏分析系列之二:jstack生成的Thread Dump日志结构解析)