apache kafka系列之源码分析走读-kafkaApi详解
Kafka源码中数据交互流程
文章图片
图1
1.概述 kafka启动时做很多初始化运行环境工作,具体请参考:apache kafka系列之源码分析走读-kafka内部模块分析
其中SockeServer类启动时,首先初始化NIO网络环境、启动监听、创建主线程、工作线程池、设置参数等等。
从上图1中可以看到整个交互过程中,kafka的所有逻辑处理和交互实际是交给KafkaApi类来处理的。
通过请求的类型,把具体的request路由到对应的handler处理。目前kafka并没有把handler抽象出来,
而是每个handler都是一个函数,混在KafkaApi类中。
2. Request请求类别 kafka-0.8.1版本中定义了10种类型请求,请求类型说明如下:
参数 |
说明(解释) |
请求二进制数据解码类 |
RequestKeys.ProduceKey |
producer请求 |
ProducerRequest |
RequestKeys.FetchKey |
consumer请求 |
FetchRequest |
RequestKeys.OffsetsKey |
topic的offset请求 |
OffsetRequest |
RequestKeys.MetadataKey |
topic元数据请求 |
TopicMetadataRequest |
RequestKeys.LeaderAndIsrKey |
leader和isr信息更新请求 |
LeaderAndIsrRequest |
RequestKeys.StopReplicaKey |
停止replica请求 |
StopReplicaRequest |
RequestKeys.UpdateMetadataKey |
更新元数据请求 |
UpdateMetadataRequest |
RequestKeys.ControlledShutdownKey |
controlledShutdown请求 |
ControlledShutdownRequest |
RequestKeys.OffsetCommitKey |
commitOffset请求 |
OffsetCommitRequest |
RequestKeys.OffsetFetchKey |
consumer的offset请求 |
OffsetFetchRequest |
下面是KafkaApi中handle方法代码:
[java]view plain copy
- def handle(request: RequestChannel.Request) {
- try{
- trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
- request.requestId match {
- case RequestKeys.ProduceKey => handleProducerRequest(request)// producer
- case RequestKeys.FetchKey => handleFetchRequest(request)// consumer
- case RequestKeys.OffsetsKey => handleOffsetRequest(request)
- case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
- case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) //成为leader或follower设置同步副本组信息
- case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
- case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
- case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)//shutdown broker
- case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
- case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
- case requestId => throw new KafkaException("Unknown api code " + requestId)
- }
- } catch {
- case e: Throwable =>
- request.requestObj.handleError(e, requestChannel, request)
- error("error when handling request %s".format(request.requestObj), e)
- } finally
- request.apiLocalCompleteTimeMs = SystemTime.milliseconds
- }
3.请求交互二进制数据格式 kafka中客户端与server端交互有多种类型,那它是怎么交互数据呢,格式是怎样?下面来揭开面纱。 请求交互二进制数据 组成为:请求类型 + 请求数据。
文章图片
3.1 ProducerRequest二进制格式
文章图片
3.2 FetchRequest二进制格式
文章图片
3.3 OffsetRequest二进制格式
【apache kafka系列之源码分析走读-kafkaApi详解】
文章图片
3.4 TopicMetadataRequest二进制格式
文章图片
3.5 LeaderAndIsrRequest二进制格式
文章图片
3.6 StopReplicaRequest二进制格式
文章图片
3.7 UpdateMetadataRequest二进制格式
文章图片
3.8 ControlledShutdownRequest二进制格式
文章图片
3.9 OffsetCommitRequest二进制格式
文章图片
3.10 OffsetFetchRequest二进制格式
文章图片
推荐阅读
- 【欢喜是你·三宅系列①】⑶
- 你不可不知的真相系列之科学
- Apache多路复用模块(MPMs)介绍
- 人脸识别|【人脸识别系列】| 实现自动化妆
- 深入浅出谈一下有关分布式消息技术(Kafka)
- 2018-06-13金句系列7(金句结构-改编古现代诗词)
- Unity和Android通信系列文章2——扩展UnityPlayerActivity
- 乡野村趣系列之烧仙草
- Java内存泄漏分析系列之二(jstack生成的Thread|Java内存泄漏分析系列之二:jstack生成的Thread Dump日志结构解析)
- 15、IDEA学习系列之其他设置(生成javadoc、缓存和索引的清理等)