【Spark】Spark 事件总线

本篇结构:

  • 事件总线介绍
  • ListenerBus 继承体系
  • LiveListenerBus 详解
  • 流程总结
一、事件总线介绍 Spark 定义了一个特质 ListenerBus,可以接受事件并且将事件提交到对应事件的监听器。
该特征主要有一个 listeners 成员,用于维护所有注册的监听器,其数据结构是一个线程安全的 CopyOnWriteArrayList[L]。
该特征还有几个主要的函数:
  • addListener:添加 listener
  • doPostEvent:给特定 listener 发送事件,该方法具体需要子类实现
  • findListenersByClass:根据类型查找 listener 列表
  • postToAll: 把事件发送给所有的 listener,虽然 CopyOnWriteArrayList 是线程安全的,但 postAll 引入了“先检查后运行”的逻辑,因此该方法不是线程安全的。
  • removeListener:删除 listener
  • removeListenerOnError:内部调用 removeListener,可由子类覆盖
【Spark】Spark 事件总线
文章图片

二、ListenerBus 继承体系 【Spark】Spark 事件总线
文章图片

上图是 spark 2.1.0 版本事件总线的继承关系,版本不同,会略有不同。
每个 ListenerBus 用于将不同的 Event 投递到不同的 Listener 中,下面以主要分析下 LiveListenerBus。
三、LiveListenerBus 详解 LiveListenerBus 继承 SparkListenerBus,和其他 ListenerBus 不同的是, LiveListenerBus 是将事件都放到一个队列中,然后另外一个线程不断从队列获取事件,将事件异步投递给监听器,达到实时刷新UI界面数据的效果。
3.1、LiveListenerBus 中的属性:
// Cap the capacity of the event queue so we get an explicit error (rather than // an OOM exception) if it's perpetually being added to more quickly than it's being drained. private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize() private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)private def validateAndGetQueueSize(): Int = { val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) if (queueSize <= 0) { throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!") } queueSize }private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE = ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size") .intConf .createWithDefault(10000)

  • eventQueue:是 SparkListenerEvent 事件的阻塞队列,队列大小可以通过 Spark 属性 spark.scheduler.listenerbus.eventqueue.size 进行配置,默认为 10000;
// Indicate if `start()` is called private val started = new AtomicBoolean(false) // Indicate if `stop()` is called private val stopped = new AtomicBoolean(false)

  • started:标记 LiveListenerBus 的启动状态的 AtomicBoolean 类型的变量;
  • stopped:标记LiveListenerBus的停止状态的 AtomicBoolean 类型的变量;
/** A counter for dropped events. It will be reset every time we log it. */ private val droppedEventsCounter = new AtomicLong(0L) /** When `droppedEventsCounter` was logged last time in milliseconds. */ @volatile private var lastReportTimestamp = 0L

  • droppedEventsCounter:使用 AtomicLong 类型对删除的事件进行计数,每当日志打印了 droppedEventsCounter 后,会将 droppedEventsCounter 重置为0;
  • lastReportTimestamp:记录最后一次日志打印 droppedEventsCounter 的时间戳;
// Indicate if we are processing some event // Guarded by `self` private var processingEvent = false

  • processingEvent:暗示当前正有事件在被 listenerThread 线程处理;
private val logDroppedEvent = new AtomicBoolean(false)

  • logDroppedEvent:标记是否由于 eventQueue 已满,导致新的事件被删除;
// A counter that represents the number of events produced and consumed in the queue private val eventLock = new Semaphore(0)

  • eventLock:表示队列中事件产生和消费的一个计数器,当有新的事件到来时释放信号量,当对事件进行处理时获取信号量,eventLock = new Semaphore(0);
  • listenerThread:异步处理事件的线程;
3.2、异步事件处理线程
private val listenerThread = new Thread(name) { setDaemon(true) override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { LiveListenerBus.withinListenerThread.withValue(true) { while (true) { eventLock.acquire() self.synchronized { processingEvent = true } try { val event = eventQueue.poll if (event == null) { // Get out of the while loop and shutdown the daemon thread if (!stopped.get) { throw new IllegalStateException("Polling `null` from eventQueue means" + " the listener bus has been stopped. So `stopped` must be true") } return } postToAll(event) } finally { self.synchronized { processingEvent = false } } } } } }

代码不算复杂,主要逻辑是:
  • 设置为 daemon thread;
  • 不断获取信号量,如果没有就会阻塞,有信号释放才会往下运行(这是依靠 new Semaphore(0)实现的,在 spark 后面的版本中,是直接用阻塞队列的 take() 方法实现。);
  • 同步控制,将 processingEvent 设置为 true;
  • 从 eventQueue 中获取事件;
  • 调用超类 ListenerBus 的 postToAll 方法,对监听器进行遍历,并调用 SparkListenerBus 的 doPostEvent 方法对事件进行匹配后执行监听器的相应方法;;
  • 每次循环结束同步控制,将 processingEvent 设置为 false;
3.3、异步事件处理线程的事件来源
DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint 及 LocalSchedulerBackend 都是 LiveListenerBus 的事件来源,它们都是通过调用 LiveListenerBus 的 post 方法将消息交给异步线程 listenerThread 处理的。
def post(event: SparkListenerEvent): Unit = { if (stopped.get) { // Drop further events to make `listenerThread` exit ASAP logError(s"$name has already stopped! Dropping event $event") return } val eventAdded = eventQueue.offer(event) if (eventAdded) { eventLock.release() } else { onDropEvent(event) droppedEventsCounter.incrementAndGet() }val droppedEvents = droppedEventsCounter.get if (droppedEvents > 0) { // Don't log too frequently if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { // There may be multiple threads trying to decrease droppedEventsCounter. // Use "compareAndSet" to make sure only one thread can win. // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and // then that thread will update it. if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { val prevLastReportTimestamp = lastReportTimestamp lastReportTimestamp = System.currentTimeMillis() logWarning(s"Dropped $droppedEvents SparkListenerEvents since " + new java.util.Date(prevLastReportTimestamp)) } } } }

  • 先判断 LiveListenerBus 是否停止,停止记录错误日志,返回;
  • 向eventQueue中添加事件:
    • 如果成功,就释放信号量,这时 listenerThread 中的 eventLock.acquire() 就可以后去信号量,从队列取出事件进行后续操作;
    • 如果失败,则移除事件 onDropEvent,并对删除事件计数器进行自增 droppedEventsCounter.incrementAndGet();
  • 如果有事件被删除,并且当前系统时间距离上一次打印 droppedEventsCounter 超过了 60 秒则重置 droppedEventsCounter 计算为0,并更新 lastReportTimestamp 为当前系统时间。
四、流程总结 用一张图总结下的 Spark 的事件总线大致的流程:
【Spark】Spark 事件总线
文章图片

五、参考资料 【【Spark】Spark 事件总线】这篇文章内容和 《spark内核设计的艺术架构设计与实现》 关于事件总线的描述章节相差不多,流程图也一样。之所以还要花费时间记录,是因为这样才更有感觉,正所谓“好记性,不如烂笔头”。

    推荐阅读