【源码阅读】EventBus源码阅读

EventBus 是一个实现了订阅者/发布者模式的框架。在阅读EventBus 之前,我们先来看一下EventBus如何使用。

  • EventBus 使用例子
public interface Report { // 上报信息 void report (String msg); }public SmsReport implements Report { @Subscribe @Override public void report (String msg) { System.out.println("send " + msg + " by sms"); } }public class App { public static void main (String[] args) { Report report = new SmsReport(); // 创建EventBus 实例 EventBus eventBus = new EventBus(); // 注册订阅者 eventBus.register(report); // 发布消息 String msg = "有内鬼,取消交易!"; eventBus.port(msg); } }


EventBus 使用是比较简单的。从上面我们可以知道,加了Subscribe 注解的方法相当于订阅者。 EventBus提供了register 函数,给订阅者进行注册。另外提供了 post函数,即发布消息。
我们回过头来,看一下订阅者/发布者模式的实现方式。然后再来看一下EventBus 是如何实现的。
  • 订阅者/发布者模式实现
public interface ISubscriber { // 处理消息 void handle (String msg); }public interface IPublisher { // 注册 void register (ISubscriber subscriber); // 发布消息 void post(String msg); }public class SubscriberA implements ISubsciber { @Override public void handle (String msg) { System.out.println(msg); } }public class PublisherCenter implements IPublisher { // 订阅者列表 private List subscriberList = new ArrayList<>(); @Override public void register (ISubscriber subscriber) { subscriberList.add(subscriber); }@Override public void post (String msg) { subscriberList.foreach(subscriber -> { subscriber.handle(msg); }); } }public class App { public static void main (String[] args) { //订阅者 ISubscriber subscriber = new SubscriberA(); IPublisher pub = new PublisherCenter(); // 注册 pub.register(subscriber); // 发布消息 String msg = "有内鬼,取消交易!"; pub.post(msg); } }


从这里,我们可以看出。自己现实的订阅者/发布者模式,在使用上是和EventBus 是比较相似的。不同的地方是,EventBus通过Subscribe注解来表示订阅者。
从自己的实现上来看,在发布者中是需要维护一张订阅者列表的,即PublishCenter中的subscriberList。另外,在PublishCenter发布消息时,会遍历订阅者列表,把消息发给每一个订阅者进行处理。到这里,我们大概知道订阅者/发布者模式的实现原理,那么,我们可以看看EventBus中是如何进行实现的。

EventBus 源码阅读

  • EventBus 类结构
【源码阅读】EventBus源码阅读
文章图片

【【源码阅读】EventBus源码阅读】从EventBus 成员以及方法命名来看。我们可以猜测SubscriberRegistry 相当于我们自己现实中的订阅者列表 subscriberList, EventBus 这里应该是进行了一个封装。
register 和 post 函数,和我们自己现实的一样。一个应该是用于注册订阅者,一个用于发布消息。

  • register 注册函数
我们可以从register 入手,看下EventBus 是如何进行订阅者注册的。
// EventBus.regiser public void register(Object object) { // 调用了SubscriberRegistry 的方法register subscribers.register(object); }// SubscribeRegistry.register void register(Object listener) { // 找出所有订阅者 Multimap, Subscriber> listenerMethods = findAllSubscribers(listener); // 把新的订阅者添加到subscribers中 for (Entry, Collection> entry : listenerMethods.asMap().entrySet()) { Class eventType = entry.getKey(); Collection eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { CopyOnWriteArraySet newSet = new CopyOnWriteArraySet<>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); }eventSubscribers.addAll(eventMethodsInListener); } }

从上面的代码中,我们可以知道,EventBus把注册的行为委托给了SubscribeRegistry 进行注册。在SubscribeRegister的注册函数中。有几个点我们是可以进行了解的。

第一: findAllSubscribers 函数做了什么,listenerMethods 这个数据表示什么?
private Multimap, Subscriber> findAllSubscribers(Object listener) { Multimap, Subscriber> methodsInListener = HashMultimap.create(); // 获取lintener类信息 Class clazz = listener.getClass(); // 获取加了Subscribe 注解的方法 for (Method method : getAnnotatedMethods(clazz)) { // 获取方法的参数类型 Class[] parameterTypes = method.getParameterTypes(); Class eventType = parameterTypes[0]; // 存放参数类型->订阅者映射信息 methodsInListener.put(eventType, Subscriber.create(bus, listener, method)); } return methodsInListener; }

从上面的代码中,我们可以知道,findAllSubscribers 主要是通过反射获取订阅者。返回订阅者订阅的消息类型和订阅者的映射关系数据。

第二: CopyOnWriteArraySet 数据结构。这里用了CopyOnWriteArraySet 来存放订阅者,这比起常见的ArrayList、HashSet这些好什么好处?
CopyOnWriteArraySet 主要是采用了写时复制的做法。不在原数组上进行增删操作。而是复制元数组,在复制的数组上进行增删操作。当操作完后,把原数组的指针指到新数组上即可。
这样做的目的主要是为了提高并发读的性能(不需要加锁)。

到这里,我们基本知道EventBus是如何进行订阅者注册的了:
当用户进行订阅者注册的时候,EventBus把注册行为委托给SubscribeRegistry 进行注册(SubscribeRegistry相当于我们自己现实的PublishCenter)。
在SubscibeRegistry中,会通过反射获取到listener上哪些方法加了Subscibe注解。提取这些方法封装为订阅者。然后存放到订阅者列表(subscribes)中。这里订阅者列表的数据结构是Map的。 key 是消息类型, value是订阅key的订阅者列表。
private final ConcurrentMap, CopyOnWriteArraySet> subscribers;


  • post 函数
post 函数,发布消息。从我们自己的现实中,我们可以知道。当发布消息时,会遍历订阅者列表,并把消息发布给订阅者。我们这里看下EventBus 中post做了什么。
public void post(Object event) { // 取出订阅事件event的所有订阅者(迭代器) Iterator eventSubscribers = subscribers.getSubscribers(event); // 分发执行 if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }


这里post函数,跟我们自己PublishCenter中实现的类似。都是取出订阅者,然后把消息发给订阅者进行执行。不过EventBus这里把分发消息的行为进行了抽象封装。
这里可以根据不同的场景,实现不同的dispatcher。EventBus提供了三种实现。
ImmediateDispatcher:同步执行,与我们PublishCenter中的一样。
void dispatch(Object event, Iterator subscribers) { checkNotNull(event); while (subscribers.hasNext()) { subscribers.next().dispatchEvent(event); } }


LegacyAsyncDispatcher: 异步执行,把事件和订阅者进行封装,放到一个全局的阻塞队列中。之后从队列中取订阅者进行执行。这里如果有多个线程进行发布消息。那么不保证按提交的顺序进行执行。比如有两个发布者线程A、B同时发布了消息。那么这里发布者线程A、B需要通知的订阅者是放到了同一个阻塞队列中的。发布者线程A和B再从阻塞队列中取订阅者进行处理。
private final ConcurrentLinkedQueue queue = Queues.newConcurrentLinkedQueue(); @Override void dispatch(Object event, Iterator subscribers) { checkNotNull(event); while (subscribers.hasNext()) { queue.add(new EventWithSubscriber(event, subscribers.next())); }EventWithSubscriber e; while ((e = queue.poll()) != null) { e.subscriber.dispatchEvent(e.event); } }


PerThreadQueueDispatcher: 在提交的发布者线程中进行执行,这里同样采用了队列来存放订阅者。与LegacyAsyncDispatcher不同的是,PerThreadQueueDispatcher只发布自己的事件。不同发布者线程之间互不影响。(这里主要是通过ThreadLocal来实现的)。
/** Per-thread queue of events to dispatch. */ private final ThreadLocal> queue = new ThreadLocal>() { @Override protected Queue initialValue() { return Queues.newArrayDeque(); } }; /** Per-thread dispatch state, used to avoid reentrant event dispatching. */ private final ThreadLocal dispatching = new ThreadLocal() { @Override protected Boolean initialValue() { return false; } }; @Override void dispatch(Object event, Iterator subscribers) { checkNotNull(event); checkNotNull(subscribers); Queue queueForThread = queue.get(); queueForThread.offer(new Event(event, subscribers)); if (!dispatching.get()) { dispatching.set(true); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null) { while (nextEvent.subscribers.hasNext()) { nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } }


到这里,我们大概知道了EventBus中post是如何做的了。EventBus发布消息时,会取出消息上注册的订阅者。然后采取不同的分发策略,把消息发给订阅者进行执行。

  • 其他知识点
在EventBus 中,我们可以看到很多设计模式的例子。比如枚举单例、委托等等。另外也有一些其他的知识点,比如反射、CopyOnWrite等等。

下面就是枚举单例的用法,这里实现了Executor接口。在执行订阅者逻辑时,根据不同场景,也是可以有不同执行策略的。比如同步执行、异步执行等等。
enum DirectExecutor implements Executor { INSTANCE; @Override public void execute(Runnable command) { command.run(); }@Override public String toString() { return "MoreExecutors.directExecutor()"; } }


  • 阅读源码收获
  1. 大体上了解了EventBus整个的源码实现,对订阅者/发布者模式有了进一步的认识。
  2. 学到了新知识点,比如CopyOnWriteArraySet数据结构。
  3. 学到了一些设计模式的用法,比如枚举单例、委托。
  4. 初步了解抽象的做法。比如消息分发(抽象了dispatcher接口)。



    推荐阅读