【Spark】Spark 事件总线
本篇结构:
- 事件总线介绍
- ListenerBus 继承体系
- LiveListenerBus 详解
- 流程总结
该特征主要有一个 listeners 成员,用于维护所有注册的监听器,其数据结构是一个线程安全的 CopyOnWriteArrayList[L]。
该特征还有几个主要的函数:
- addListener:添加 listener
- doPostEvent:给特定 listener 发送事件,该方法具体需要子类实现
- findListenersByClass:根据类型查找 listener 列表
- postToAll: 把事件发送给所有的 listener,虽然 CopyOnWriteArrayList 是线程安全的,但 postAll 引入了“先检查后运行”的逻辑,因此该方法不是线程安全的。
- removeListener:删除 listener
- removeListenerOnError:内部调用 removeListener,可由子类覆盖
文章图片
二、ListenerBus 继承体系
文章图片
上图是 spark 2.1.0 版本事件总线的继承关系,版本不同,会略有不同。
每个 ListenerBus 用于将不同的 Event 投递到不同的 Listener 中,下面以主要分析下 LiveListenerBus。
三、LiveListenerBus 详解 LiveListenerBus 继承 SparkListenerBus,和其他 ListenerBus 不同的是, LiveListenerBus 是将事件都放到一个队列中,然后另外一个线程不断从队列获取事件,将事件异步投递给监听器,达到实时刷新UI界面数据的效果。
3.1、LiveListenerBus 中的属性:
// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)private def validateAndGetQueueSize(): Int = {
val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
if (queueSize <= 0) {
throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
}
queueSize
}private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
.intConf
.createWithDefault(10000)
- eventQueue:是 SparkListenerEvent 事件的阻塞队列,队列大小可以通过 Spark 属性 spark.scheduler.listenerbus.eventqueue.size 进行配置,默认为 10000;
// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
// Indicate if `stop()` is called
private val stopped = new AtomicBoolean(false)
- started:标记 LiveListenerBus 的启动状态的 AtomicBoolean 类型的变量;
- stopped:标记LiveListenerBus的停止状态的 AtomicBoolean 类型的变量;
/** A counter for dropped events. It will be reset every time we log it. */
private val droppedEventsCounter = new AtomicLong(0L)
/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L
- droppedEventsCounter:使用 AtomicLong 类型对删除的事件进行计数,每当日志打印了 droppedEventsCounter 后,会将 droppedEventsCounter 重置为0;
- lastReportTimestamp:记录最后一次日志打印 droppedEventsCounter 的时间戳;
// Indicate if we are processing some event
// Guarded by `self`
private var processingEvent = false
- processingEvent:暗示当前正有事件在被 listenerThread 线程处理;
private val logDroppedEvent = new AtomicBoolean(false)
- logDroppedEvent:标记是否由于 eventQueue 已满,导致新的事件被删除;
// A counter that represents the number of events produced and consumed in the queue
private val eventLock = new Semaphore(0)
- eventLock:表示队列中事件产生和消费的一个计数器,当有新的事件到来时释放信号量,当对事件进行处理时获取信号量,eventLock = new Semaphore(0);
- listenerThread:异步处理事件的线程;
private val listenerThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
LiveListenerBus.withinListenerThread.withValue(true) {
while (true) {
eventLock.acquire()
self.synchronized {
processingEvent = true
}
try {
val event = eventQueue.poll
if (event == null) {
// Get out of the while loop and shutdown the daemon thread
if (!stopped.get) {
throw new IllegalStateException("Polling `null` from eventQueue means" +
" the listener bus has been stopped. So `stopped` must be true")
}
return
}
postToAll(event)
} finally {
self.synchronized {
processingEvent = false
}
}
}
}
}
}
代码不算复杂,主要逻辑是:
- 设置为 daemon thread;
- 不断获取信号量,如果没有就会阻塞,有信号释放才会往下运行(这是依靠 new Semaphore(0)实现的,在 spark 后面的版本中,是直接用阻塞队列的 take() 方法实现。);
- 同步控制,将 processingEvent 设置为 true;
- 从 eventQueue 中获取事件;
- 调用超类 ListenerBus 的 postToAll 方法,对监听器进行遍历,并调用 SparkListenerBus 的 doPostEvent 方法对事件进行匹配后执行监听器的相应方法;;
- 每次循环结束同步控制,将 processingEvent 设置为 false;
DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint 及 LocalSchedulerBackend 都是 LiveListenerBus 的事件来源,它们都是通过调用 LiveListenerBus 的 post 方法将消息交给异步线程 listenerThread 处理的。
def post(event: SparkListenerEvent): Unit = {
if (stopped.get) {
// Drop further events to make `listenerThread` exit ASAP
logError(s"$name has already stopped! Dropping event $event")
return
}
val eventAdded = eventQueue.offer(event)
if (eventAdded) {
eventLock.release()
} else {
onDropEvent(event)
droppedEventsCounter.incrementAndGet()
}val droppedEvents = droppedEventsCounter.get
if (droppedEvents > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
new java.util.Date(prevLastReportTimestamp))
}
}
}
}
- 先判断 LiveListenerBus 是否停止,停止记录错误日志,返回;
- 向eventQueue中添加事件:
- 如果成功,就释放信号量,这时 listenerThread 中的 eventLock.acquire() 就可以后去信号量,从队列取出事件进行后续操作;
- 如果失败,则移除事件 onDropEvent,并对删除事件计数器进行自增 droppedEventsCounter.incrementAndGet();
- 如果有事件被删除,并且当前系统时间距离上一次打印 droppedEventsCounter 超过了 60 秒则重置 droppedEventsCounter 计算为0,并更新 lastReportTimestamp 为当前系统时间。
文章图片
五、参考资料 【【Spark】Spark 事件总线】这篇文章内容和 《spark内核设计的艺术架构设计与实现》 关于事件总线的描述章节相差不多,流程图也一样。之所以还要花费时间记录,是因为这样才更有感觉,正所谓“好记性,不如烂笔头”。
推荐阅读
- 宽容谁
- 我要做大厨
- 增长黑客的海盗法则
- 画画吗()
- 2019-02-13——今天谈梦想()
- 远去的风筝
- 三十年后的广场舞大爷
- 叙述作文
- 20190302|20190302 复盘翻盘
- 学无止境,人生还很长