Kafka源码之KafkaConsumer分析之ConsumerNetworkClient

【Kafka源码之KafkaConsumer分析之ConsumerNetworkClient】我们先来看一下KafkaConsumer几个核心的方法:
subscribe:订阅指定的Topic,并为消费者消费者自动分配分区
assign:用户手动指定的Topic,并且指定消费的分区。
commit:提交消费者已经完成的offset
seek:指定消费者起始消费的位置
poll:负责从服务端获取消息
pause/resume:暂停/继续Consumer
在之前我们介绍了NetworkClient的实现,它依赖于KSelector,InFlightRequests、Metadata等组件,负责管理客户端于Kafa集群中各个Node节点之间的连接,通过KSelector法实现了发送请求的功能,并通过一系列的handle方法处理请求的响应、超时请求以及断线重连。ConsumerNetworkClient在NetworkClient上进行了封装,提供了更高级的功能和更易用的API:
client:NetworkClient对象
delayedTasks:定时任务队列
metadata:用于管理Kafka集群元数据
unsent:缓冲队列
unsentExpiryMs:ClientRequest在unsent中缓存的超时时长
wakeup:由调用KafkaConsumer对象的消费者线程之外的其他线程设置,表示要中断KafkaConsumer线程
wakeupDisabledCount:KafkaConsumer是否正在执行不可中断的方法。
下面看他的几个核心方法:

private boolean trySend(long now) { boolean requestsSent = false; //遍历unsent集合 for (Map.Entry> requestEntry: unsent.entrySet()) { //获取节点 Node node = requestEntry.getKey(); //获取要发往这个节点的请求集合 Iterator iterator = requestEntry.getValue().iterator(); while (iterator.hasNext()) { ClientRequest request = iterator.next(); //当前节点是否可以发送 if (client.ready(node, now)) { //调用send方法等待发送 client.send(request, now); iterator.remove(); requestsSent = true; } } } return requestsSent; }

遍历unsent集合,里面保存了要发送的指定节点的所有请求,每次发送都要判断是否要发送的那个节点准备好了,如果符合条件就会条用NetworkClient.send方法将请求放入InFlightRequests队中等待响应,也放去send字段中等待发送,并将此消息从列表中删除。
private void maybeTriggerWakeup() { //检查是否由其他线程中断 if (wakeupDisabledCount == 0 && wakeup.get()) { wakeup.set(false); throw new WakeupException(); } }

根据wakeupDisabledCount 和wakeup查看是否有其他线程中断,如果有中断请求那么抛出异常
private void checkDisconnects(long now) { // 遍历剩下的还没有发送请求 Iterator>> iterator = unsent.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry> requestEntry = iterator.next(); Node node = requestEntry.getKey(); //查看是否是因为连接失败导致的 if (client.connectionFailed(node)) { iterator.remove(); //如果是需要调用它的回调函数 for (ClientRequest request : requestEntry.getValue()) { RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); handler.onComplete(new ClientResponse(request, now, true, null)); } } } }

这个方法会检查unsent队列中那些还没有发送的请求是不是因为连接失败的原因导致的,如果是,那么需要从队列中删除这个请求,然后调用他的回调方法。
private void failExpiredRequests(long now) { Iterator>> iterator = unsent.entrySet().iterator(); //遍历unsent队列 while (iterator.hasNext()) { Map.Entry> requestEntry = iterator.next(); Iterator requestIterator = requestEntry.getValue().iterator(); while (requestIterator.hasNext()) { ClientRequest request = requestIterator.next(); //如果当前请求已经超时,从队列中删除它,然后调用他的回调方法 if (request.createdTimeMs() < now - unsentExpiryMs) { RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); handler.raise(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms.")); requestIterator.remove(); } else break; } if (requestEntry.getValue().isEmpty()) iterator.remove(); } }

在这个方法中会将unsent中超时的任务从队列中移除并且会调用它的回调方法。
我们把ConsumerNetworkClient中poll用到的一些方法介绍了,下面我们就要开始看一下poll的整体流程了:
private void poll(long timeout, long now, boolean executeDelayedTasks) { // 发送unsent中的请求 trySend(now); // 计算超时时间 timeout = Math.min(timeout, delayedTasks.nextTimeout(now)); //调用client.poll方法 clientPoll(timeout, now); now = time.milliseconds(); //清除一些因为连接失败的请求 checkDisconnects(now); // 判断是否需要执行定时任务 if (executeDelayedTasks) delayedTasks.poll(now); // 再次发送unsent中的请求 trySend(now); // 清除一些超时的请求 failExpiredRequests(now); }

我们总结一下这里面的流程:
1、调用ConsumerNetworkClient.trySend方法循环处理unsent队列里的请求
2、计算超时时间
3、调用NetworkClient.poll方法处理I/O事件
4、检测是否有其他线程中断
5、处理一些因为连接失败而未发送的请求
6、检测是否需要处理定时任务
7、调用ConsumerNetworkClient.trySend方法循环处理unsent队列里的请求
8、处理一些超时的请求
public void pollNoWakeup() { disableWakeups(); try { poll(0, time.milliseconds(), false); } finally { enableWakeups(); } }

这个方法表示不可被中断的poll方法,在执行poll方法之前disableWakeups方法会将wakeupDisabledCount加1,这样即使其他线程请求中断也不会被响应。
public boolean poll(RequestFuture future, long timeout) { long begin = time.milliseconds(); long remaining = timeout; long now = begin; do { poll(remaining, now, true); now = time.milliseconds(); long elapsed = now - begin; remaining = timeout - elapsed; } while (!future.isDone() && remaining > 0); return future.isDone(); }

这个方法会阻塞到future完成
public RequestFuture send(Node node, ApiKeys api, AbstractRequest request) { long now = time.milliseconds(); //构造一个回调方法 RequestFutureCompletionHandler future = new RequestFutureCompletionHandler(); RequestHeader header = client.nextRequestHeader(api); //封装RequestSend RequestSend send = new RequestSend(node.idString(), header, request.toStruct()); put(node, new ClientRequest(now, true, send, future)); return future; }

send方法会将请求封装成ClientRequest放到unsent队列里面。

    推荐阅读