背景
最近看了一个flink相关的issue,主要是在heartbeat来带上TaskManager的status作为payload,主要是为了解决TaskExecutor#updateTaskExecutionState会因为暂时的网络异常原因导致将TM的状态通知到JM失败的问题,如果是terminal state的通知失败会导致JM无法感知TM的结束。具体讨论细节请看issue
issue地址: https://issues.apache.org/jira/browse/FLINK-17075
flink heartbeat
借着这个机制顺便先研究下TM与JM之间的心跳机制。先看一下JM处理心跳的方法JobMasterGateway#heartbeatFromTaskManager
。这是一个RPC方法,看一下在TaskManager的那里会调用他,唯一的调用地方是TaskExecutor#establishJobManagerConnection
private void establishJobManagerConnection(JobTable.Job job, final JobMasterGateway jobMasterGateway, JMTMRegistrationSuccess registrationSuccess) {
...
// 在注册到jobManagerHeartbeatManager里的HeartbeatTarget的receiveHeartbeat方法里调用了向JM发送心跳的方法
// monitor the job manager as heartbeat target
jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget() {
@Override
public void receiveHeartbeat(ResourceID resourceID, TaskExecutorToJobManagerHeartbeatPayload payload) {
jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
}@Override
public void requestHeartbeat(ResourceID resourceID, TaskExecutorToJobManagerHeartbeatPayload payload) {
// request heartbeat will never be called on the task manager side
}
});
internalOfferSlotsToJobManager(establishedConnection);
}
接下来我们看下被注册到HeartbeatManager里的HeartbeatTarget的receiveHeartbeat方法什么时候被调用。直接看一下
TaskExecutor
中的jobManagerHeartbeatManager实际上是HeartbeatManager的哪个子类,根据初始化部分的代码我们可以看到是HeartbeatManagerImpl
的实例。我们看下其中调用HeartbeatTarget#receiveHeartbeat
方法的地方@Override
public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) {
if (!stopped) {
log.debug("Received heartbeat request from {}.", requestOrigin);
// 获取被注册的HeartbeatTarget
final HeartbeatTarget heartbeatTarget = reportHeartbeat(requestOrigin);
if (heartbeatTarget != null) {
// 触发心跳上报的listener
if (heartbeatPayload != null) {
heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
}// 调用HeartbeatTarget的receiveHeartbeat方法上报心跳
heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
}
}
}
接下来我们看一下
HeartbeatTarget#requestHeartbeat
方法是在哪来被调用。该方法在TaskExecutor#heartbeatFromJobManager
方法里被调用。该方法是一个RPC方法@Override
public void heartbeatFromJobManager(ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) {
jobManagerHeartbeatManager.requestHeartbeat(resourceID, allocatedSlotReport);
}
接下来看一下该方法被何处调用,该方法被
JobMaster#registerTaskManager
调用@Override
public CompletableFuture registerTaskManager(
final String taskManagerRpcAddress,
final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation,
final Time timeout) {final TaskManagerLocation taskManagerLocation;
try {
taskManagerLocation = TaskManagerLocation.fromUnresolvedLocation(unresolvedTaskManagerLocation);
} catch (Throwable throwable) {
final String errMsg = String.format(
"Could not accept TaskManager registration. TaskManager address %s cannot be resolved. %s",
unresolvedTaskManagerLocation.getExternalAddress(),
throwable.getMessage());
log.error(errMsg);
return CompletableFuture.completedFuture(new RegistrationResponse.Decline(errMsg));
}final ResourceID taskManagerId = taskManagerLocation.getResourceID();
if (registeredTaskManagers.containsKey(taskManagerId)) {
final RegistrationResponse response = new JMTMRegistrationSuccess(resourceId);
return CompletableFuture.completedFuture(response);
} else {
return getRpcService()
.connect(taskManagerRpcAddress, TaskExecutorGateway.class)
.handleAsync(
(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
if (throwable != null) {
return new RegistrationResponse.Decline(throwable.getMessage());
}slotPool.registerTaskManager(taskManagerId);
registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
// monitor the task manager as heartbeat target
taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget() {
@Override
public void receiveHeartbeat(ResourceID resourceID, AllocatedSlotReport payload) {
// the task manager will not request heartbeat, so this method will never be called currently
}@Override
public void requestHeartbeat(ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) {
taskExecutorGateway.heartbeatFromJobManager(resourceID, allocatedSlotReport);
}
});
return new JMTMRegistrationSuccess(resourceId);
},
getMainThreadExecutor());
}
}
顺便看下JobMaster中的taskManagerHeartbeatManager的具体子类。该子类还实现了Runnable接口,我们看下该类的run方法
@Override
public void run() {
if (!stopped) {
log.debug("Trigger heartbeat request.");
// 遍历所有被注册的HeartbeatTarget, 发送请求心跳的请求
for (HeartbeatMonitor heartbeatMonitor : getHeartbeatTargets().values()) {
requestHeartbeat(heartbeatMonitor);
}// 一段实际后再调用该Runnable
getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
}
}private void requestHeartbeat(HeartbeatMonitor heartbeatMonitor) {
O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
final HeartbeatTarget heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
}
自此其实整个心跳的触发逻辑都已经很清晰了,我们顺便再看一下JobMaster中taskManagerHeartbeatManager的初始化。最后调用的是HeartbeatManagerSenderImpl的构造方法
HeartbeatManagerSenderImpl(
long heartbeatPeriod,
long heartbeatTimeout,
ResourceID ownResourceID,
HeartbeatListener heartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log,
HeartbeatMonitor.Factory heartbeatMonitorFactory) {
super(
heartbeatTimeout,
ownResourceID,
heartbeatListener,
mainThreadExecutor,
log,
heartbeatMonitorFactory);
// 这里触发了第一次heartbeat请求
this.heartbeatPeriod = heartbeatPeriod;
mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
}
总结
整个JM与TM之间的heartbeat逻辑大致如下。
JM: 启动JobMaster时构造一个HeartbeatManagerSenderImpl, 并使用主线程不断的定时调度他执行,调用所有被注册的HeartbeatTarget#requestHeartbeat方法TM: 启动时调用了JM的registerTaskManger方法JM: 在registerTaskManger里构造一个HeartbeatTarget的实现,其requestHeartbeat方法调用的是TM的heartbeatFromJobManager RPC方法。并将其注册到HeartbeatManagerSenderImpl里TM: TM的heartbeatFromJobManager方法最后调用JM的heartbeatFromTaskManager上报心跳给JM
【flink|Flink heartbeat逻辑梳理】所以其实整个流程就是
JM定时触发心跳请求 -> TM接受触发心跳请求 -> TM汇报心跳给JM
推荐阅读
- 人工智能|干货!人体姿态估计与运动预测
- Python专栏|数据分析的常规流程
- 读书笔记|《白话大数据和机器学习》学习笔记1
- 网络|一文彻底搞懂前端监控
- 分布式|《Python3网络爬虫开发实战(第二版)》内容介绍
- html5|各行业工资单出炉 IT类连续多年霸占“榜首”位置
- 人工智能|【机器学习】深度盘点(详细介绍 Python 中的 7 种交叉验证方法!)
- 网络|简单聊聊压缩网络
- 数据库|效率最高的Excel数据导入---(c#调用SSIS Package将数据库数据导入到Excel文件中【附源代码下载】)...
- r语言|手把手(R语言文本挖掘和词云可视化实践)