8.2|8.2 Shuffle 过程之 MapOutputTracker

1. 概述 MapOutputTracker用来跟踪中间过程Stage的输出, 为后续的shuffle过程准备好上游的数据. 这些数据的句柄由BlockManager来管理, 大小由BlockManager来估计.

【8.2|8.2 Shuffle 过程之 MapOutputTracker】在Driver端跑着MapOutputTrackerMaster
在Executor端跑着MapOutputTrackerWorker
通信使用的是一个注册到akka的RPC调用MapOutputTrackerMasterEndpoint
整个MapOutputTracker被维护在sparkEnv结构, sparkEnv结构是sparkContexty的一部分, 只要持有sc, 就可以对应的使用这个结构了.
2. MapOutputTrackerMasterEndpoint的基本概念 作为一个RPC调用, 它处理两个时间
  • GetMapOutputStatuses(shuffleId: Int)
    得到ShuffleId, 继而判断传递的shuffle status结构序列化后是否超过了akka的最大消息体积(默认128MB)
  • StopMapOutputTracker
    停止这个MapOutputTracker
3. MapOutputTracker的Abstract对象 3.1 初始化和维护的结构 MapOutputTracker是一个abstract class, 后边两个具体实现分别是Master端和Worker端. 它们分别维护了不同的映射表, Master端需要维护的是全局的映射表, 而Worker端只维护本地的即可
初始化方法和说明如下
/** * Class that keeps track of the location of the map output of * a stage. This is abstract because different versions of MapOutputTracker * (driver and executor) use different HashMap to store its metadata. */ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging

  • Driver端维护master的RPCendpoint
/** Set to the MapOutputTrackerMasterEndpoint living on the driver. */ var trackerEndpoint: RpcEndpointRef = _

  • 维护shuffleMapTask的句柄, 从这个句柄可以拿到Task, 继而拿到Task涉及的状态和Block, 继而可以获取数据或者操作数据
/** * This HashMap has different behavior for the driver and the executors. * * On the driver, it serves as the source of map outputs recorded from ShuffleMapTasks. * On the executors, it simply serves as a cache, in which a miss triggers a fetch from the * driver's corresponding HashMap. * * Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a * thread-safe map. */ protected val mapStatuses: Map[Int, Array[MapStatus]]

  • 计数锁结构
/** * Incremented every time a fetch fails so that client nodes know to clear * their cache of map output locations if this happens. */ protected var epoch: Long = 0 protected val epochLock = new AnyRef

  • 记录哪些数据正处于被拉取状态
/** Remembers which map output locations are currently being fetched on an executor. */ private val fetching = new HashSet[Int]

3.2 重要的方法
  • getMapSizesByExecutorId
    获取shuffle block的状态和所在的位置以及大小等基本信息
/** * Called from executors to get the server URIs and output sizes for each shuffle block that * needs to be read from a given reduce task. * * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, *and the second item is a sequence of (shuffle block id, shuffle block size) tuples *describing the shuffle blocks that are stored at that block manager. */ def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int) : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1) }/** * Called from executors to get the server URIs and output sizes for each shuffle block that * needs to be read from a given range of map output partitions (startPartition is included but * endPartition is excluded from the range). * * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, *and the second item is a sequence of (shuffle block id, shuffle block size) tuples *describing the shuffle blocks that are stored at that block manager. */ def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") val statuses = getStatuses(shuffleId) // Synchronize on the returned array because, on the driver, it gets mutated in place statuses.synchronized { return MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses) } }

  • getStatuses(shuffleId: Int)
    获得shuffle output的基本的状态

    推荐阅读