Spark源码学习笔记3-LiveListenerBus

继2-JobProgressListener之后,我们再来看看SparkContext构造函数中的listenerBus: LiveListenerBus,JobProgressListener在SparkContext构造函数new出来之后就添加到listenerBus中了,代码如下:

_jobProgressListener = new JobProgressListener(_conf) listenerBus.addListener(jobProgressListener)

  • LiveListenerBus.scala所在package位置:
    package org.apache.spark.scheduler
/** * Asynchronously passes SparkListenerEvents to registered SparkListeners. * * Until `start()` is called, all posted events are only buffered. Only after this listener bus * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when `stop()` is called, and it will drop further events after stopping. */ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {self =>import LiveListenerBus._// Cap the capacity of the event queue so we get an explicit error (rather than // an OOM exception) if it's perpetually being added to more quickly than it's being drained. private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize() private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)......// A counter that represents the number of events produced and consumed in the queue private val eventLock = new Semaphore(0)private val listenerThread = new Thread(name) { setDaemon(true) override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { LiveListenerBus.withinListenerThread.withValue(true) { while (true) { eventLock.acquire() self.synchronized { processingEvent = true } try { val event = eventQueue.poll if (event == null) { // Get out of the while loop and shutdown the daemon thread if (!stopped.get) { throw new IllegalStateException("Polling `null` from eventQueue means" + " the listener bus has been stopped. So `stopped` must be true") } return } postToAll(event) } finally { self.synchronized { processingEvent = false } } } } } }/** * Start sending events to attached listeners. * * This first sends out all buffered events posted before this listener bus has started, then * listens for any additional events asynchronously while the listener bus is still running. * This should only be called once. * */ def start(): Unit = { if (started.compareAndSet(false, true)) { listenerThread.start() } else { throw new IllegalStateException(s"$name already started!") } }def post(event: SparkListenerEvent): Unit = { if (stopped.get) { // Drop further events to make `listenerThread` exit ASAP logError(s"$name has already stopped! Dropping event $event") return } val eventAdded = eventQueue.offer(event) if (eventAdded) { eventLock.release() } else { onDropEvent(event) droppedEventsCounter.incrementAndGet() }val droppedEvents = droppedEventsCounter.get if (droppedEvents > 0) { // Don't log too frequently if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { // There may be multiple threads trying to decrease droppedEventsCounter. // Use "compareAndSet" to make sure only one thread can win. // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and // then that thread will update it. if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { val prevLastReportTimestamp = lastReportTimestamp lastReportTimestamp = System.currentTimeMillis() logWarning(s"Dropped $droppedEvents SparkListenerEvents since " + new java.util.Date(prevLastReportTimestamp)) } } } }/** * Stop the listener bus. It will wait until the queued events have been processed, but drop the * new events after stopping. */ def stop(): Unit = { if (!started.get()) { throw new IllegalStateException(s"Attempted to stop $name that has not yet started!") } if (stopped.compareAndSet(false, true)) { // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know // `stop` is called. eventLock.release() listenerThread.join() } else { // Keep quiet } } }private[spark] object LiveListenerBus { // Allows for Context to check whether stop() call is made within listener thread val withinListenerThread: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)/** The thread name of Spark listener bus */ val name = "SparkListenerBus" }

LiveListenerBus的主要作用是异步的把SparkListenerEvents传递给已向它注册的SparkListeners。它是伴生类,具有伴生对象object LiveListenerBus。
LiveListenerBus内部维持着一个eventQueue: LinkedBlockingQueue[SparkListenerEvent],即基于链表的阻塞队列,用于存放SparkListenerEvent。同时LiveListenerBus内部维持一个线程listenerThread: Thread,在调用LiveListenerBus的start和stop时,实际调用的是这个线程的start和join(Waits for this thread to die.)。对事件队列存取使用了一个eventLock: Semaphore
// A counter that represents the number of events produced and consumed in the queue private val eventLock = new Semaphore(0)

当成功放入一个事件到事件队列时,就调用eventLock.release(),信号量值加1,listener线程取事件之前通过eventLock.acquire(),阻塞直到有一个信号(信号量大于0),然后信号量减1,取出一个事件处理。另外,LiveListenerBus的stop函数里面也调用了eventLock.release(),意思就是通过让listener线程最后可以多取一个空事件,通过对空事件的处理,可以进行相关的停止操作:
// Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know // `stop` is called. eventLock.release()

LiveListenerBus继承自trait SparkListenerBus。
  • SparkListenerBus如下:
package org.apache.spark.schedulerimport org.apache.spark.util.ListenerBusprivate[spark] trait SparkListenerBus extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {protected override def doPostEvent( listener: SparkListenerInterface, event: SparkListenerEvent): Unit = { event match { case stageSubmitted: SparkListenerStageSubmitted => listener.onStageSubmitted(stageSubmitted) case stageCompleted: SparkListenerStageCompleted => listener.onStageCompleted(stageCompleted) case jobStart: SparkListenerJobStart => listener.onJobStart(jobStart) case jobEnd: SparkListenerJobEnd => listener.onJobEnd(jobEnd) case taskStart: SparkListenerTaskStart => listener.onTaskStart(taskStart) ...... case _ => listener.onOtherEvent(event) } }

trait SparkListenerBus只实现了一个方法doPostEvent,传递的参数为具体的listerner和event,运用模式匹配对各种事件调用该listener的事件处理函数(trait SparkListenerInterface的接口实现)
protected override def doPostEvent( listener: SparkListenerInterface, event: SparkListenerEvent): Unit

trait SparkListenerBus继承ListenerBus[SparkListenerInterface, SparkListenerEvent]
  • trait ListenerBus:
package org.apache.spark.util ...... /** * An event bus which posts events to its listeners. */ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {// Marked `private[spark]` for access in tests. private[spark] val listeners = new CopyOnWriteArrayList[L]/** * Add a listener to listen events. This method is thread-safe and can be called in any thread. */ final def addListener(listener: L): Unit = { listeners.add(listener) }/** * Remove a listener and it won't receive any events. This method is thread-safe and can be called * in any thread. */ final def removeListener(listener: L): Unit = { listeners.remove(listener) }/** * Post the event to all registered listeners. The `postToAll` caller should guarantee calling * `postToAll` in the same thread for all events. */ final def postToAll(event: E): Unit = { // JavaConverters can create a JIterableWrapper if we use asScala. // However, this method will be called frequently. To avoid the wrapper cost, here we use // Java Iterator directly. val iter = listeners.iterator while (iter.hasNext) { val listener = iter.next() try { doPostEvent(listener, event) } catch { ...... } } }/** * Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same * thread for all listeners. */ protected def doPostEvent(listener: L, event: E): Unitprivate[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = { ...... }}

ListenerBus内部维持着一个私有的CopyOnWriteArrayList[L]对象listeners,对ListenerBus的addListener和removeListener都是调用listeners的add和remove操作。这些操作都是线程安全的,CopyOnWriteArrayList内部实现对具有修改的操作都加锁了。
  • CopyOnWriteArrayList就简单摘个描述吧:
package java.util.concurrent; ...... /** * A thread-safe variant of {@link java.util.ArrayList} in which all mutative * operations ({@code add}, {@code set}, and so on) are implemented by * making a fresh copy of the underlying array. * ...... * / public class CopyOnWriteArrayList implements List, RandomAccess, Cloneable, java.io.Serializable { private static final long serialVersionUID = 8673264195747942595L; /** The lock protecting all mutators */ final transient ReentrantLock lock = new ReentrantLock(); ......

【Spark源码学习笔记3-LiveListenerBus】本次学习中的LinkedBlockingQueue及offer(in)和poll(out)操作,内部使用了重入锁ReentrantLock。暂且标记,后续继续了解。

    推荐阅读