大数据处理|Spark事件总线接口ListenerBus

Spark事件总线接口ListenerBus 介绍 ListenerBus是事件总线接口。它是一个trait类,它定义了事件和处理事件的listener之间的接口。该接口主要包括以下函数:
(1)添加listener
(2)删除listener
(3)给特定listener发送事件
(4)把事件发送给所有的listener
(5)通过特定的类查找listener
它创建了一个线程安全的链表,把所有的listener都放到该链表中。
实现 这是ListenerBus的申明,如下代码:

/** * An event bus which posts events to its listeners. */ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {

【大数据处理|Spark事件总线接口ListenerBus】所有的listener都被放到java的CopyOnWriteArrayList链表中。这个是一个线程安全的List。
// Marked `private[spark]` for access in tests. private[spark] val listeners = new CopyOnWriteArrayList[L]

可以向这个队列中添加和删除listener,提供的方法如下:
/** * Add a listener to listen events. This method is thread-safe and can be called in any thread. */ final def addListener(listener: L): Unit = { listeners.add(listener) }/** * Remove a listener and it won't receive any events. This method is thread-safe and can be called * in any thread. */ final def removeListener(listener: L): Unit = { listeners.remove(listener) }

若有多个listener,就需要有一个方法来进行全局发送,把所有的事件发送给所有的listener,方法如下:
/** * Post the event to all registered listeners. The `postToAll` caller should guarantee calling * `postToAll` in the same thread for all events. */ final def postToAll(event: E): Unit = { // JavaConverters can create a JIterableWrapper if we use asScala. // However, this method will be called frequently. // To avoid the wrapper cost, here we use // Java Iterator directly. val iter = listeners.iterator while (iter.hasNext) { val listener = iter.next() try { doPostEvent(listener, event) } catch { case NonFatal(e) => logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) } } }

该方法会遍历所有的listener,并把事件发送给所有的listener。
也可以把event(事件)发送给给定的listener,方法如下:
/** * Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same * thread for all listeners. */ protected def doPostEvent(listener: L, event: E): Unit

另外还提供了,通过特定类来查找listener的方法:
private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = { val c = implicitly[ClassTag[T]].runtimeClass listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq }

    推荐阅读