大数据处理|Spark事件总线接口ListenerBus
Spark事件总线接口ListenerBus
介绍 ListenerBus是事件总线接口。它是一个trait类,它定义了事件和处理事件的listener之间的接口。该接口主要包括以下函数:
(1)添加listener
(2)删除listener
(3)给特定listener发送事件
(4)把事件发送给所有的listener
(5)通过特定的类查找listener
它创建了一个线程安全的链表,把所有的listener都放到该链表中。
实现 这是ListenerBus的申明,如下代码:
/**
* An event bus which posts events to its listeners.
*/
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
【大数据处理|Spark事件总线接口ListenerBus】所有的listener都被放到java的CopyOnWriteArrayList链表中。这个是一个线程安全的List。
// Marked `private[spark]` for access in tests.
private[spark] val listeners = new CopyOnWriteArrayList[L]
可以向这个队列中添加和删除listener,提供的方法如下:
/**
* Add a listener to listen events. This method is thread-safe and can be called in any thread.
*/
final def addListener(listener: L): Unit = {
listeners.add(listener)
}/**
* Remove a listener and it won't receive any events. This method is thread-safe and can be called
* in any thread.
*/
final def removeListener(listener: L): Unit = {
listeners.remove(listener)
}
若有多个listener,就需要有一个方法来进行全局发送,把所有的事件发送给所有的listener,方法如下:
/**
* Post the event to all registered listeners. The `postToAll` caller should guarantee calling
* `postToAll` in the same thread for all events.
*/
final def postToAll(event: E): Unit = {
// JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently.
// To avoid the wrapper cost, here we use
// Java Iterator directly.
val iter = listeners.iterator
while (iter.hasNext) {
val listener = iter.next()
try {
doPostEvent(listener, event)
} catch {
case NonFatal(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
}
}
}
该方法会遍历所有的listener,并把事件发送给所有的listener。
也可以把event(事件)发送给给定的listener,方法如下:
/**
* Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same
* thread for all listeners.
*/
protected def doPostEvent(listener: L, event: E): Unit
另外还提供了,通过特定类来查找listener的方法:
private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
val c = implicitly[ClassTag[T]].runtimeClass
listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
}
推荐阅读
- 我要做大厨
- 《真与假的困惑》???|《真与假的困惑》??? ——致良知是一种伟大的力量
- 知识
- 三十年后的广场舞大爷
- 奔向你的城市
- 村里的故事|村里的故事 --赵大头
- 期刊|期刊 | 国内核心期刊之(北大核心)
- 华为旁!大社区、地铁新盘,佳兆业城市广场五期!
- 汇讲-勇于突破
- 每日一话(49)——一位清华教授在朋友圈给大学生的9条建议