flink|Flink heartbeat逻辑梳理

背景
最近看了一个flink相关的issue,主要是在heartbeat来带上TaskManager的status作为payload,主要是为了解决TaskExecutor#updateTaskExecutionState会因为暂时的网络异常原因导致将TM的状态通知到JM失败的问题,如果是terminal state的通知失败会导致JM无法感知TM的结束。具体讨论细节请看issue
issue地址: https://issues.apache.org/jira/browse/FLINK-17075
flink heartbeat
借着这个机制顺便先研究下TM与JM之间的心跳机制。先看一下JM处理心跳的方法JobMasterGateway#heartbeatFromTaskManager。这是一个RPC方法,看一下在TaskManager的那里会调用他,唯一的调用地方是TaskExecutor#establishJobManagerConnection

private void establishJobManagerConnection(JobTable.Job job, final JobMasterGateway jobMasterGateway, JMTMRegistrationSuccess registrationSuccess) { ... // 在注册到jobManagerHeartbeatManager里的HeartbeatTarget的receiveHeartbeat方法里调用了向JM发送心跳的方法 // monitor the job manager as heartbeat target jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget() { @Override public void receiveHeartbeat(ResourceID resourceID, TaskExecutorToJobManagerHeartbeatPayload payload) { jobMasterGateway.heartbeatFromTaskManager(resourceID, payload); }@Override public void requestHeartbeat(ResourceID resourceID, TaskExecutorToJobManagerHeartbeatPayload payload) { // request heartbeat will never be called on the task manager side } }); internalOfferSlotsToJobManager(establishedConnection); }

接下来我们看下被注册到HeartbeatManager里的HeartbeatTarget的receiveHeartbeat方法什么时候被调用。直接看一下TaskExecutor中的jobManagerHeartbeatManager实际上是HeartbeatManager的哪个子类,根据初始化部分的代码我们可以看到是HeartbeatManagerImpl的实例。我们看下其中调用HeartbeatTarget#receiveHeartbeat方法的地方
@Override public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) { if (!stopped) { log.debug("Received heartbeat request from {}.", requestOrigin); // 获取被注册的HeartbeatTarget final HeartbeatTarget heartbeatTarget = reportHeartbeat(requestOrigin); if (heartbeatTarget != null) { // 触发心跳上报的listener if (heartbeatPayload != null) { heartbeatListener.reportPayload(requestOrigin, heartbeatPayload); }// 调用HeartbeatTarget的receiveHeartbeat方法上报心跳 heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin)); } } }

接下来我们看一下HeartbeatTarget#requestHeartbeat方法是在哪来被调用。该方法在TaskExecutor#heartbeatFromJobManager方法里被调用。该方法是一个RPC方法
@Override public void heartbeatFromJobManager(ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) { jobManagerHeartbeatManager.requestHeartbeat(resourceID, allocatedSlotReport); }

接下来看一下该方法被何处调用,该方法被JobMaster#registerTaskManager调用
@Override public CompletableFuture registerTaskManager( final String taskManagerRpcAddress, final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation, final Time timeout) {final TaskManagerLocation taskManagerLocation; try { taskManagerLocation = TaskManagerLocation.fromUnresolvedLocation(unresolvedTaskManagerLocation); } catch (Throwable throwable) { final String errMsg = String.format( "Could not accept TaskManager registration. TaskManager address %s cannot be resolved. %s", unresolvedTaskManagerLocation.getExternalAddress(), throwable.getMessage()); log.error(errMsg); return CompletableFuture.completedFuture(new RegistrationResponse.Decline(errMsg)); }final ResourceID taskManagerId = taskManagerLocation.getResourceID(); if (registeredTaskManagers.containsKey(taskManagerId)) { final RegistrationResponse response = new JMTMRegistrationSuccess(resourceId); return CompletableFuture.completedFuture(response); } else { return getRpcService() .connect(taskManagerRpcAddress, TaskExecutorGateway.class) .handleAsync( (TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> { if (throwable != null) { return new RegistrationResponse.Decline(throwable.getMessage()); }slotPool.registerTaskManager(taskManagerId); registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway)); // monitor the task manager as heartbeat target taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget() { @Override public void receiveHeartbeat(ResourceID resourceID, AllocatedSlotReport payload) { // the task manager will not request heartbeat, so this method will never be called currently }@Override public void requestHeartbeat(ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) { taskExecutorGateway.heartbeatFromJobManager(resourceID, allocatedSlotReport); } }); return new JMTMRegistrationSuccess(resourceId); }, getMainThreadExecutor()); } }

顺便看下JobMaster中的taskManagerHeartbeatManager的具体子类。该子类还实现了Runnable接口,我们看下该类的run方法
@Override public void run() { if (!stopped) { log.debug("Trigger heartbeat request."); // 遍历所有被注册的HeartbeatTarget, 发送请求心跳的请求 for (HeartbeatMonitor heartbeatMonitor : getHeartbeatTargets().values()) { requestHeartbeat(heartbeatMonitor); }// 一段实际后再调用该Runnable getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS); } }private void requestHeartbeat(HeartbeatMonitor heartbeatMonitor) { O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId()); final HeartbeatTarget heartbeatTarget = heartbeatMonitor.getHeartbeatTarget(); heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload); }

自此其实整个心跳的触发逻辑都已经很清晰了,我们顺便再看一下JobMaster中taskManagerHeartbeatManager的初始化。最后调用的是HeartbeatManagerSenderImpl的构造方法
HeartbeatManagerSenderImpl( long heartbeatPeriod, long heartbeatTimeout, ResourceID ownResourceID, HeartbeatListener heartbeatListener, ScheduledExecutor mainThreadExecutor, Logger log, HeartbeatMonitor.Factory heartbeatMonitorFactory) { super( heartbeatTimeout, ownResourceID, heartbeatListener, mainThreadExecutor, log, heartbeatMonitorFactory); // 这里触发了第一次heartbeat请求 this.heartbeatPeriod = heartbeatPeriod; mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS); }

总结
整个JM与TM之间的heartbeat逻辑大致如下。
JM: 启动JobMaster时构造一个HeartbeatManagerSenderImpl, 并使用主线程不断的定时调度他执行,调用所有被注册的HeartbeatTarget#requestHeartbeat方法TM: 启动时调用了JM的registerTaskManger方法JM: 在registerTaskManger里构造一个HeartbeatTarget的实现,其requestHeartbeat方法调用的是TM的heartbeatFromJobManager RPC方法。并将其注册到HeartbeatManagerSenderImpl里TM: TM的heartbeatFromJobManager方法最后调用JM的heartbeatFromTaskManager上报心跳给JM

【flink|Flink heartbeat逻辑梳理】所以其实整个流程就是
JM定时触发心跳请求 -> TM接受触发心跳请求 -> TM汇报心跳给JM

    推荐阅读