8.2|8.2 Shuffle 过程之 MapOutputTracker
1. 概述
MapOutputTracker
用来跟踪中间过程Stage的输出, 为后续的shuffle过程准备好上游的数据. 这些数据的句柄由BlockManager来管理, 大小由BlockManager来估计.
【8.2|8.2 Shuffle 过程之 MapOutputTracker】在Driver端跑着MapOutputTrackerMaster整个MapOutputTracker被维护在sparkEnv结构, sparkEnv结构是sparkContexty的一部分, 只要持有sc, 就可以对应的使用这个结构了.
在Executor端跑着MapOutputTrackerWorker
通信使用的是一个注册到akka的RPC调用MapOutputTrackerMasterEndpoint
2. MapOutputTrackerMasterEndpoint的基本概念 作为一个RPC调用, 它处理两个时间
- GetMapOutputStatuses(shuffleId: Int)
得到ShuffleId, 继而判断传递的shuffle status结构序列化后是否超过了akka的最大消息体积(默认128MB)
- StopMapOutputTracker
停止这个MapOutputTracker
初始化方法和说明如下
/**
* Class that keeps track of the location of the map output of
* a stage. This is abstract because different versions of MapOutputTracker
* (driver and executor) use different HashMap to store its metadata.
*/
private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
- Driver端维护master的RPCendpoint
/** Set to the MapOutputTrackerMasterEndpoint living on the driver. */
var trackerEndpoint: RpcEndpointRef = _
- 维护shuffleMapTask的句柄, 从这个句柄可以拿到Task, 继而拿到Task涉及的状态和Block, 继而可以获取数据或者操作数据
/**
* This HashMap has different behavior for the driver and the executors.
*
* On the driver, it serves as the source of map outputs recorded from ShuffleMapTasks.
* On the executors, it simply serves as a cache, in which a miss triggers a fetch from the
* driver's corresponding HashMap.
*
* Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a
* thread-safe map.
*/
protected val mapStatuses: Map[Int, Array[MapStatus]]
- 计数锁结构
/**
* Incremented every time a fetch fails so that client nodes know to clear
* their cache of map output locations if this happens.
*/
protected var epoch: Long = 0
protected val epochLock = new AnyRef
- 记录哪些数据正处于被拉取状态
/** Remembers which map output locations are currently being fetched on an executor. */
private val fetching = new HashSet[Int]
3.2 重要的方法
- getMapSizesByExecutorId
获取shuffle block的状态和所在的位置以及大小等基本信息
/**
* Called from executors to get the server URIs and output sizes for each shuffle block that
* needs to be read from a given reduce task.
*
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
*and the second item is a sequence of (shuffle block id, shuffle block size) tuples
*describing the shuffle blocks that are stored at that block manager.
*/
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1)
}/**
* Called from executors to get the server URIs and output sizes for each shuffle block that
* needs to be read from a given range of map output partitions (startPartition is included but
* endPartition is excluded from the range).
*
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
*and the second item is a sequence of (shuffle block id, shuffle block size) tuples
*describing the shuffle blocks that are stored at that block manager.
*/
def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
val statuses = getStatuses(shuffleId)
// Synchronize on the returned array because, on the driver, it gets mutated in place
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
}
}
- getStatuses(shuffleId: Int)
获得shuffle output的基本的状态
推荐阅读
- 全过程工程咨询——时间管理(12)
- 17.8.21
- 普通人进阶的三个过程-很多人都知道,但却本末倒置
- Android系统启动之init.rc文件解析过程
- 会玩才有未来
- 十月的故事(三)
- MySQL|MySQL 存储过程语法及实例
- K14|K14 9/15销售提问法D2
- 8月#周总结#第34周(8.20-8.26)
- 8.26记