apache kafka系列之源码分析走读-kafkaApi详解

Kafka源码中数据交互流程
apache kafka系列之源码分析走读-kafkaApi详解
文章图片


图1
1.概述 kafka启动时做很多初始化运行环境工作,具体请参考:apache kafka系列之源码分析走读-kafka内部模块分析
其中SockeServer类启动时,首先初始化NIO网络环境、启动监听、创建主线程、工作线程池、设置参数等等。
从上图1中可以看到整个交互过程中,kafka的所有逻辑处理和交互实际是交给KafkaApi类来处理的。

通过请求的类型,把具体的request路由到对应的handler处理。目前kafka并没有把handler抽象出来,
而是每个handler都是一个函数,混在KafkaApi类中。


2. Request请求类别 kafka-0.8.1版本中定义了10种类型请求,请求类型说明如下:

参数
说明(解释)
请求二进制数据解码类
RequestKeys.ProduceKey
producer请求
ProducerRequest
RequestKeys.FetchKey
consumer请求
FetchRequest
RequestKeys.OffsetsKey
topic的offset请求
OffsetRequest
RequestKeys.MetadataKey
topic元数据请求
TopicMetadataRequest
RequestKeys.LeaderAndIsrKey
leader和isr信息更新请求
LeaderAndIsrRequest
RequestKeys.StopReplicaKey
停止replica请求
StopReplicaRequest
RequestKeys.UpdateMetadataKey
更新元数据请求
UpdateMetadataRequest
RequestKeys.ControlledShutdownKey
controlledShutdown请求
ControlledShutdownRequest
RequestKeys.OffsetCommitKey
commitOffset请求
OffsetCommitRequest
RequestKeys.OffsetFetchKey
consumer的offset请求
OffsetFetchRequest

下面是KafkaApi中handle方法代码:

[java]view plain copy
  1. def handle(request: RequestChannel.Request) {
  2. try{
  3. trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
  4. request.requestId match {
  5. case RequestKeys.ProduceKey => handleProducerRequest(request)// producer
  6. case RequestKeys.FetchKey => handleFetchRequest(request)// consumer
  7. case RequestKeys.OffsetsKey => handleOffsetRequest(request)
  8. case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
  9. case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) //成为leader或follower设置同步副本组信息
  10. case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
  11. case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
  12. case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)//shutdown broker
  13. case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
  14. case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
  15. case requestId => throw new KafkaException("Unknown api code " + requestId)
  16. }
  17. } catch {
  18. case e: Throwable =>
  19. request.requestObj.handleError(e, requestChannel, request)
  20. error("error when handling request %s".format(request.requestObj), e)
  21. } finally
  22. request.apiLocalCompleteTimeMs = SystemTime.milliseconds
  23. }

3.请求交互二进制数据格式 kafka中客户端与server端交互有多种类型,那它是怎么交互数据呢,格式是怎样?下面来揭开面纱。 请求交互二进制数据 组成为:请求类型 + 请求数据。 apache kafka系列之源码分析走读-kafkaApi详解
文章图片

3.1 ProducerRequest二进制格式 apache kafka系列之源码分析走读-kafkaApi详解
文章图片

3.2 FetchRequest二进制格式
apache kafka系列之源码分析走读-kafkaApi详解
文章图片

3.3 OffsetRequest二进制格式
【apache kafka系列之源码分析走读-kafkaApi详解】 apache kafka系列之源码分析走读-kafkaApi详解
文章图片

3.4 TopicMetadataRequest二进制格式
apache kafka系列之源码分析走读-kafkaApi详解
文章图片

3.5 LeaderAndIsrRequest二进制格式
apache kafka系列之源码分析走读-kafkaApi详解
文章图片

3.6 StopReplicaRequest二进制格式
apache kafka系列之源码分析走读-kafkaApi详解
文章图片

3.7 UpdateMetadataRequest二进制格式
apache kafka系列之源码分析走读-kafkaApi详解
文章图片

3.8 ControlledShutdownRequest二进制格式
apache kafka系列之源码分析走读-kafkaApi详解
文章图片

3.9 OffsetCommitRequest二进制格式
apache kafka系列之源码分析走读-kafkaApi详解
文章图片

3.10 OffsetFetchRequest二进制格式
apache kafka系列之源码分析走读-kafkaApi详解
文章图片


    推荐阅读