spark stage 重试导致 stage 无法正常结束,一直在等待中
spark stage 重试导致 stage 无法正常结束,一直在等待中
线上 spark 版本,2.4.1sparkUI 现象 stage tab 页
此时任务已被用户 kill
https://github.com/apache/spa...
文章图片
driver log 日志
文章图片
文章图片
分析现象一:在任务被用户 kill 前,stage tab 页中的 stage16.1 一直卡在 577/685 的任务进度,而 driver 日志中 TaskSetManager 管理的 stage16.1 的进度已经 685/685 了,然后就停止不动通过定位日志代码,TaskSetManager.handleSuccessfulTask,557/685 中的 557 是 tasksSuccessful 的值
if (!successful(index)) {
tasksSuccessful += 1
logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" +
s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" +
s" ($tasksSuccessful/$numTasks)")
// Mark successful and stop if all the tasks have succeeded.
successful(index) = true
if (tasksSuccessful == numTasks) {
isZombie = true
}
} else {
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already completed successfully")
}
tasksSuccessful 个数单由 taskSetManager 维护,并且它的增加有两个方式,一个是上面的这种方式,一个是在创建 taskSetManager 的时候 markPartitionCompleted 标记已经完成的分区为成功,代码如下
private[scheduler] def createTaskSetManager(
taskSet: TaskSet,
maxTaskFailures: Int): TaskSetManager = {
// only create a BitSet once for a certain stage since we only remove
// that stage when an active TaskSetManager succeed.
stageIdToFinishedPartitions.getOrElseUpdate(taskSet.stageId, new BitSet)
val tsm = new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
// TaskSet got submitted by DAGScheduler may have some already completed
// tasks since DAGScheduler does not always know all the tasks that have
// been completed by other tasksets when completing a stage, so we mark
// those tasks as finished here to avoid launching duplicate tasks, while
// holding the TaskSchedulerImpl lock.
// See SPARK-25250 and `markPartitionCompletedInAllTaskSets()`
stageIdToFinishedPartitions.get(taskSet.stageId).foreach {
finishedPartitions => finishedPartitions.foreach(tsm.markPartitionCompleted(_, None))
}
tsm
}
结合对于因为 fetch 导致 fail 的 stage 来说,重试的 stage 会复用旧的 stage id 的 partitions 完成信息 创建 TaskSetManager,然后标记已完成的分区为成功
文章图片
分析现象二:driver 日志中 stage 16.1 的完成任务起始值是 109/685
INFO scheduler.DAGScheduler: Resubmitting ShuffleMapStage 14 (run at ThreadPoolExecutor.java:1149) and ShuffleMapStage 16 (run at ThreadPoolExecutor.java:1149) due to fetch failure
整体分析 大致看了下有用的日志信息,开始整体分析。
问题1:driver 日志中已经 685/685 进度完成了,为什么没有触发下一个 stage?并且 stage ui 中是 577/685 的进度
任务完成时 CoarseGrainedExecutorBackend 会发送 StatusUpdate 的任务状态变更信息,CoarseGrainedSchedulerBackend 会获取到 StatusUpdate 的变更信息,然后调用相应的 statusUpdate 方法,使用 TaskResultGetter 线程池异步处理完成的任务。
异步处理完成任务的线程中会调用 TaskSchedulerImpl.handleSuccessfulTask,再由 TaskSetManager.handleSuccessfulTask 进行处理,这个方法中就会输出 driver 中 685/685 的日志信息,然后调用 DAGScheduler 的 taskEnded 方法发送 CompletionEvent 信息。
DAGScheduler 获取到 CompletionEvent 信息,进行判断。这里是核心逻辑,如下代码:
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
shuffleStage.pendingPartitions -= task.partitionId
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (executorFailureEpoch.contains(execId) &&
smt.epoch <= executorFailureEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else {
// The epoch of the task is acceptable (i.e., the task was launched after the most
// recent failure we're aware of for the executor), so mark the task's output as
// available.
mapOutputTracker.registerMapOutput(
shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
}if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
markStageAsFinished(shuffleStage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
logInfo("waiting: " + waitingStages)
logInfo("failed: " + failedStages)// This call to increment the epoch may not be strictly necessary, but it is retained
// for now in order to minimize the changes in behavior from an earlier version of the
// code. This existing behavior of always incrementing the epoch following any
// successful shuffle map stage completion may have benefits by causing unneeded
// cached map outputs to be cleaned up earlier on executors. In the future we can
// consider removing this call, but this will require some extra investigation.
// See https://github.com/apache/spark/pull/17955/files#r117385673 for more details.
mapOutputTracker.incrementEpoch()clearCacheLocs()if (!shuffleStage.isAvailable) {
// Some tasks had failed;
let's resubmit this shuffleStage.
// TODO: Lower-level scheduler should also deal with this
logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
") because some of its tasks had failed: " +
shuffleStage.findMissingPartitions().mkString(", "))
submitStage(shuffleStage)
} else {
markMapStageJobsAsFinished(shuffleStage)
submitWaitingChildStages(shuffleStage)
}
}
如果 stage 是 running 状态,并且 pendingPartitions 为空,则会进入 submitStage 或者 submitWaitingChildStages 的逻辑分支,否则不做处理(这里就是为什么任务一直等待的原因,pendingParitions 一直不为空)。那么现在的问题应该就是 pendingParitions。
根据 DAGScheduler 的任务构建分发可以知道,pendingParitions 的个数就是 missing tasks 的个数,就是 685。missing tasks 的个数,stage 会调用 mapOutputTrackerMaster 的 findMissingPartitions 方法调用,代码如下:
/**
* Returns the sequence of partition ids that are missing (i.e. needs to be computed).
*/
def findMissingPartitions(): Seq[Int] = synchronized {
val missing = (0 until numPartitions).filter(id => mapStatuses(id) == null)
assert(missing.size == numPartitions - _numAvailableOutputs,
s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}")
missing
}
可以看出,和 mapStatuses 的状态有关系,会过滤出 mapStatuses 状态为 null 的分区
文章图片
由上图可知,重试前 stage 16 的任务完成数为 423,那么,总共 1000分区,1000 - 423 = 577,不等于 stage 16.1 重试要跑的 685 个,然而 findMissingPartitions 却找到了 685 个,说明异常情况下,mapStatues 中的分区状态可能会被清除,比如 executor lose 等,map 分区存储没了,自然没必要保留 mapStatues 中的地址信息了。
文章图片
注意日志中的一条信息:
21/06/26 01:28:54 INFO scheduler.DAGScheduler: Shuffle files lost for executor: 2 (epoch 6)
21/06/26 01:28:54 INFO scheduler.DAGScheduler: Resubmitting failed stages
if (fileLost &&
(!shuffleFileLostEpoch.contains(execId) || shuffleFileLostEpoch(execId) < currentEpoch)) {
shuffleFileLostEpoch(execId) = currentEpoch
hostToUnregisterOutputs match {
case Some(host) =>
logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)")
mapOutputTracker.removeOutputsOnHost(host)
case None =>
logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)")
mapOutputTracker.removeOutputsOnExecutor(execId)
}
}
就可以搜到如上代码,mapOutputTracker.removeOutputsOnExecutor(execId),就会把这个 stage 上在 executor 2 的 MapStatus 信息清除了。搜了一下,这 423 个完成的任务,在 executor 2 上的刚好 108 个,加上没有跑完的 577 个,刚好 685 个,所以 DAGScheduler 找到了 685 个未完成的分区,并且提交给 TaskSchedulerImpl 创建 TaskSetManager 进行提交了。
那么问题来了,看日志是从 109 个任务开始完成的,也就是说那 108 个任务压根没有提交运行,导致 pendingPartitions 一直处于等待这 108 个任务完成。
日志明显 109/685,从日志下手,上文也提到了,109 指的是 tasksSuccessful 变量。初始化代码如下:
private[scheduler] def createTaskSetManager(
taskSet: TaskSet,
maxTaskFailures: Int): TaskSetManager = {
// only create a BitSet once for a certain stage since we only remove
// that stage when an active TaskSetManager succeed.
stageIdToFinishedPartitions.getOrElseUpdate(taskSet.stageId, new BitSet)
val tsm = new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
// TaskSet got submitted by DAGScheduler may have some already completed
// tasks since DAGScheduler does not always know all the tasks that have
// been completed by other tasksets when completing a stage, so we mark
// those tasks as finished here to avoid launching duplicate tasks, while
// holding the TaskSchedulerImpl lock.
// See SPARK-25250 and `markPartitionCompletedInAllTaskSets()`
stageIdToFinishedPartitions.get(taskSet.stageId).foreach {
finishedPartitions => finishedPartitions.foreach(tsm.markPartitionCompleted(_, None))
}
tsm
}
//
private[scheduler] def markPartitionCompleted(
partitionId: Int,
taskInfo: Option[TaskInfo]): Unit = {
partitionToIndex.get(partitionId).foreach { index =>
if (!successful(index)) {
if (speculationEnabled && !isZombie) {
taskInfo.foreach { info => successfulTaskDurations.insert(info.duration) }
}
tasksSuccessful += 1
successful(index) = true
if (tasksSuccessful == numTasks) {
if (!isZombie) {
sched.stageIdToFinishedPartitions -= stageId
isZombie = true
}
}
maybeFinishTaskSet()
}
}
}
stageIdToFinishedPartitions 获取的是原来 stage 16 完成的分区信息,然后对新的 TaskSetManager successful 进行标记,successful判断,这里的 successful 只是判断 task 是否是终结状态,和成功与否没有关系。所以,这里的 108 个任务 successful 就会被标记为 true 的状态,并且 taskSuccessful + 1,这也是为什么 driver 日志中 finish task 后起始任务是 109 开始了。
TaskSchedulerImpl resourceOffers 获取任务时会通过 successful 判断分区是否成功执行过。这时候那么 685 个任务只有 477 个任务会被分发执行,剩余 108 个不会分发。然后 stage 中的 pendingParitions 个数在重试的 stage task 全部完成后,依然还会剩余 108 个,导致当前 job 无法暂停,一直等待其它任务分发完成。
原因总结
重试stage,dag 在创建 taskSets 的时候,把 executor lost 的任务也标记为需要重跑的任务。但是在创建 taskSetManager 的时候。复用的是曾经 stage 记录的 finishPartions BitSet,这里并没有把 executor lost 的进行去除,导致在标记 partitions 成功的时候后,把 executor lost 的也标记上了。使这部分任务无法提交,dag 一直等待不停止。理论解决方式
- 在标记的时候判断 executor 的失败方式,如果只是 fetch data 失败, executor 存活,说明之前的 map 数据还有,就没必要重跑,可以标记为成功,但是 pendingPartitions 这里也要进行移除,不能只是标记成功。如果 executor 都被移除,新添了一个 executor,就需要重跑了
- 或者可以不标记,只要是提交过来的 missingTask,都提交,不管失败原因
3月7号,新增了这个标记
文章图片
4月14号这个提交被 revert了
文章图片
【spark stage 重试导致 stage 无法正常结束,一直在等待中】具体讨论可以看 https://github.com/apache/spa...,采用的是第二种方案
解决方式
看了下,这个在 spark 2.4.X 的分支中,应该是 2.4.2 才合进去
推荐阅读
- Spark|Spark 数据倾斜及其解决方案
- Spark--java.util.NoSuchElementException:|Spark--java.util.NoSuchElementException: None.get at at
- spark|spark 资源动态释放
- SparkStreaming读Kafka-|SparkStreaming读Kafka- Couldn't find leaders for Set
- spark|spark UDAF根据某列去重求合 distinct sum
- 大数据|spark UDAF 自定义聚合函数 UserDefinedAggregateFunction 带条件的去重操作
- Python3实战Spark大数据分析及调度
- Gank Spark
- OJ|POJ 2686 Traveling by Stagecoach (状态压缩DP)
- Spring|springboot项目(spark 2 操作es6+)