spark|Spark内核源码深度剖析(SparkContext原理剖析与源码分析)

1.SparkContex原理剖析
1.图解:
spark|Spark内核源码深度剖析(SparkContext原理剖析与源码分析)
文章图片

2.SparkContext源码分析
1.TaskScheduler创建:
SparkContext.scala

// Create and start the scheduler private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master)

//不同的提交模式,会创建不同的TaskScheduler private def createTaskScheduler( sc: SparkContext, master: String): (SchedulerBackend, TaskScheduler) = { master match { //standalone模式 case SPARK_REGEX(sparkUrl) => //TaskSchedulerImpl()底层通过操作一个SchedulerBackend,针对不同的种类的cluster(standalone、yarn和mesos),调度task。 //他也可以通过使用一个LocalBackend,并且将isLocal参数设置为true,来在本地模式下工作。 //它负责处理一些童工的资源,比如决定多个job的调度顺序,启动检查任务执行 //客户端首先应用调度initialize()方法和start()方法,然后通过runTasks()方法提交task sets val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) (backend, scheduler)}

TaskSchedulerImpl.scala
def initialize(backend: SchedulerBackend) { this.backend = backend // temporarily set rootPool name to empty rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) } } schedulableBuilder.buildPools() }

【spark|Spark内核源码深度剖析(SparkContext原理剖析与源码分析)】1.TaskScheduler启动:
TaskSchedulerImpl.scala
override def start() { //重点是调用了SparkDeploySchedulerBackend类的start backend.start()if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatcher sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, SPECULATION_INTERVAL milliseconds) { Utils.tryOrExit { checkSpeculatableTasks() } } } }

SparkDeploySchedulerBackend.scala
override def start() { super.start()// The endpoint for executors to talk to us val driverUrl = AkkaUtils.address( AkkaUtils.protocol(actorSystem), SparkEnv.driverActorSystemName, conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) val args = Seq( "--driver-url", driverUrl, "--executor-id", "{{EXECUTOR_ID}}", "--hostname", "{{HOSTNAME}}", "--cores", "{{CORES}}", "--app-id", "{{APP_ID}}", "--worker-url", "{{WORKER_URL}}") val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") .map(Utils.splitCommandString).getOrElse(Seq.empty) val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath") .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath") .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)// When testing, expose the parent class path to the child. This is processed by // compute-classpath.{cmd,sh} and makes all needed jars available to child processes // when the assembly is built with the "*-provided" profiles enabled. val testingClassPath = if (sys.props.contains("spark.testing")) { sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq } else { Nil }// Start executors with a few necessary configs for registering with the scheduler val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") //ApplicationDescription非常重要,它代表了当前的这个 //application的一切情况 //包括application最大需要多少CPU core,每个slave上需要多少内存 val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec) //创建APPClient //APPClient是一个接口,它负责为application与Spark集群进行通 //信。它会接收一个Spark Master的URL,以及一个application,和 //一个集群事件的监听器,以及各种事件发生时监听器的回调函数 client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start()waitForRegistration() }

2.DAGScheduler创建:
SparkContext.scala
@volatile private[spark] var dagScheduler: DAGScheduler = _ try { //DAGScheduler类实现了面向stage的调度机制的高层次的调度层,他会为每个job计算一个stage的DAG(有向无环图), //追踪RDD和stage的输出是否被物化了(物化就是说,写入了磁盘或者内存等地方),并且寻找一个最少 //消耗(最优、最小)调度机制来运行job,它会将stage作为tasksets提交到底层的TaskSchedulerImple上, //来在集群上运行它们(task) //除了处理stage的DAG,它还负责决定运行每个task的最佳位置,基于当前的缓存状态,将这些最佳位置提交底层的 //TaskSchedulerImpl。此外,它会处理理由于shuffle输出文件丢失导致的失败,在这种情况下,旧的stage可能就会 //被重新提交,一个stage内部的失败,如果不是由于shuffle文件丢失所导致的,会被TAskSchedule处理,它会多次重试 //每一个task,直到最后,实在是不行了,才会去取消整个stage dagScheduler = new DAGScheduler(this) } catch { case e: Exception => { try { stop() } finally { throw new SparkException("Error while constructing DAGScheduler", e) } } }

3.SparkUI的创建:
SparkContext.scala
// Initialize the Spark UI private[spark] val ui: Option[SparkUI] = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener, env.securityManager,appName)) } else { // For tests, do not enable the UI None }

SparkUI.scala
//默认端口 val DEFAULT_PORT = 4040 def createLiveUI( sc: SparkContext, conf: SparkConf, listenerBus: SparkListenerBus, jobProgressListener: JobProgressListener, securityManager: SecurityManager, appName: String): SparkUI ={ create(Some(sc), conf, listenerBus, securityManager, appName, jobProgressListener = Some(jobProgressListener)) }

    推荐阅读