sparkstreaming|sparkstreaming 源码 我们从 start() 开始说起

所有的入口就是:StreamingContext.start()

这里的关键点就是: 启动了调度,调用了JobScheduler.start()

def start(): Unit = synchronized { state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { validate()// Start the streaming scheduler in a new thread, so that thread local properties // like call sites and job groups can be reset without affecting those of the // current thread. ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))// 这里的一个关键,start() 开始调度 scheduler.start() } state = StreamingContextState.ACTIVE scheduler.listenerBus.post( StreamingListenerStreamingStarted(System.currentTimeMillis())) } catch { case NonFatal(e) => logError("Error starting the context, marking it as stopped", e) scheduler.stop(false) state = StreamingContextState.STOPPED throw e } StreamingContext.setActiveContext(this) }} }

我们来看下这里的start(),这里new了一个EventLoop 这个是不是很熟悉,在之前的spark的dag里我们就遇到过,重写了这里的onReceive 方法用来不断的接受事件 ,调用的是processEvent 来不断的处理事件
def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been startedlogDebug("Starting JobScheduler")// 这里new了一个EventLoop 这个是不是很熟悉,在之前的spark的dag里我们就遇到过,重写了这里的onReceive 方法用来不断的接受事件 ,调用的是processEvent 来不断的处理事件eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start()// attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController)listenerBus.start() receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc)val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match { case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient] case _ => null }executorAllocationManager = ExecutorAllocationManager.createIfEnabled( executorAllocClient, receiverTracker, ssc.conf, ssc.graph.batchDuration.milliseconds, clock) executorAllocationManager.foreach(ssc.addStreamingListener) receiverTracker.start() jobGenerator.start() executorAllocationManager.foreach(_.start()) logInfo("Started JobScheduler") }

EventLoop的为一个实现类:
sparkstreaming|sparkstreaming 源码 我们从 start() 开始说起
文章图片

【sparkstreaming|sparkstreaming 源码 我们从 start() 开始说起】是不是越来越有spark core的感觉了

// 处理接收的事件 private def processEvent(event: JobSchedulerEvent) { try { event match { case JobStarted(job, startTime) => handleJobStart(job, startTime) case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime) case ErrorReported(m, e) => handleError(m, e) } } catch { case e: Throwable => reportError("Error in job scheduler", e) } }// 看看是如何实现的 private def handleJobStart(job: Job, startTime: Long) { val jobSet = jobSets.get(job.time) val isFirstJobOfJobSet = !jobSet.hasStarted jobSet.handleJobStart(job) if (isFirstJobOfJobSet) { // "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the // correct "jobSet.processingStartTime". listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo)) } job.setStartTime(startTime) listenerBus.post(StreamingListenerOutputOperationStarted(job.toOutputOperationInfo)) logInfo("Starting job " + job.id + " from job set of time " + jobSet.time) }

这里有个JobSet 【 Class representing a set of Jobs belong to the same batch.】代表的是同一个批次的job集合
JobScheduler.start() 里有个关键点:
jobGenerator.start()
def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been startedlogDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start()// attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController)listenerBus.start() receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc)val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match { case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient] case _ => null }executorAllocationManager = ExecutorAllocationManager.createIfEnabled( executorAllocClient, receiverTracker, ssc.conf, ssc.graph.batchDuration.milliseconds, clock) executorAllocationManager.foreach(ssc.addStreamingListener) receiverTracker.start()//这里是个关键点 jobGenerator.start() executorAllocationManager.foreach(_.start()) logInfo("Started JobScheduler") }

我们来看下这里的关键角色:
JobGenerator 【This class generates jobs from DStreams as well as drives checkpointing and cleaning up DStreammetadata.】
1:生成jobs
2: 驱动checkpointing 和 元数据的清理
这种类一般我们都会习惯性的找看看有没有和start 相关的方法,必须有~
产生jobs
这里判断是不是从checkpoint里进行恢复的操作,如果不是的化那么就要执行 startFirstTime()
/** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWritereventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start()//这里判断是不是从checkpoint里进行恢复的操作,如果不是的化那么就要执行 startFirstTime() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }

来看下这个方法:首次启动generator
/** Starts the generator for the first time */ private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) }

两个start (),一个是graph【DStreamGraph】 一个是timer
graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds)

这个重量级选手登场:DStreamGraph
说它之前我们先说一下:
DStream【abstract class】 他的全称是:Discretized Stream 离散流,我必须把官方注释拿出来说说。
它是spark 流的最基本的抽象,为什么说他是离散流呢,我举一个例子,因为这个流并不是一直连续的而是就像一个被控制的水龙头,
周期性的出水,你可以理解它其实是断断续续的,过一阵子给你出一股水,就是这个感觉。
他是一系列连续的rdd,所以它的底层还是rdd,这个rdd是周期性的产生的,就是你设定的间隔,它可以从别的dstream转化过来,
这个就是说,你在spark core里的那些map,filter 操作兴许到了这里也是可以用的,(具体的去看api),因为底层他就是rdd,
所以编程模型是一样的,所以你spark core 会了 很容易上手。
特征总结:
- A list of other DStreams that the DStream depends on--这个说的就是dstream 之间的依赖
- A time interval at which the DStream generates an RDD--这个没啥说的
- A function that is used to generate an RDD after each time interval--这个说的就是产生的rdds
这些话是如何来的:
/** Time interval after which the DStream generates an RDD */ def slideDuration: Duration/** List of parent DStreams on which this DStream depends on */ def dependencies: List[DStream[_]]/** Method that generates an RDD for the given time */ def compute(validTime: Time): Option[RDD[T]]

/** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous * sequence of RDDs (of the same type) representing a continuous stream of data (see * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs). * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, * etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by * transforming existing DStreams using operations such as `map`, * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream * periodically generates a RDD, either from live data or by transforming the RDD generated by a * parent DStream. * * This class contains the basic operations available on all DStreams, such as `map`, `filter` and * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and * `join`. These operations are automatically available on any DStream of pairs * (e.g., DStream[(Int, Int)] through implicit conversions. * * A DStream internally is characterized by a few basic properties: *- A list of other DStreams that the DStream depends on *- A time interval at which the DStream generates an RDD *- A function that is used to generate an RDD after each time interval */

它的实现类很多:
sparkstreaming|sparkstreaming 源码 我们从 start() 开始说起
文章图片

看到一个老熟人:
sparkstreaming|sparkstreaming 源码 我们从 start() 开始说起
文章图片

后面我会专门的来分析一下和kafka的对接
继续我们的 DStreamGraph.start() 方法:这里是 _.start() _是dstream/** Method called to start receiving data. Subclasses must implement this method. */这个方法是用来开始接收数据的
def start(time: Time) { this.synchronized { require(zeroTime == null, "DStream graph computation already started") zeroTime = time startTime = time outputStreams.foreach(_.initialize(zeroTime)) outputStreams.foreach(_.remember(rememberDuration)) outputStreams.foreach(_.validateAtStart()) numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]]) inputStreamNameAndID = inputStreams.map(is => (is.name, is.id))//这里是 _.start() _是dstreaminputStreams.par.foreach(_.start()) } }

看下这个start() 的具体实现
sparkstreaming|sparkstreaming 源码 我们从 start() 开始说起
文章图片


    推荐阅读