EventBus 源码试读(二)

转载请标明出处:【Gypsophila 的博客】 http://blog.csdn.net/astro_gypsophila/article/details/75269287
EventBus 基本使用和进阶配置
EventBus 源码试读(一)
EventBus 源码试读(二)
EventBus 源码试读(三)
这篇文章将从 post() 作为切入点,了解关于事件发布流程、对应不同 threadMode 的处理以及 sticky 事件特性。

// EventBus 类中 public void post(Object event) { // 取出当前线程的 PostingThreadState 对象,并且得到其事件集合 // 将要 post 的事件加入集合 PostingThreadState postingState = currentPostingThreadState.get(); List eventQueue = postingState.eventQueue; eventQueue.add(event); if (!postingState.isPosting) { // 根据 looper 来判断是否当前是处于主线程 postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper(); postingState.isPosting = true; if (postingState.canceled) { throw new EventBusException("Internal error. Abort state was not reset"); } try { // 开始分发事件 while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0), postingState); } } finally { // 重置 postingState.isPosting = false; postingState.isMainThread = false; } } }
由于在后面会设置到在什么线程中调用起订阅者中的响应方法,这边需要判断是否当前线程是主线程。真正的调用执行逻辑是在 postSingleEvent 中。
// EventBus 类中 private void postSingleEvent(Object event, PostingThreadState postingState) throws Error { Class eventClass = event.getClass(); boolean subscriptionFound = false; // 先不考虑事件继承 if (eventInheritance) { List> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { // 简单起见,从主干流程,进入到这里 subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } // 没有找到 if (!subscriptionFound) { // 根据设置,是否打印日志 if (logNoSubscriberMessages) { Log.d(TAG, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { // 外部程序可以选择是否订阅这种事件(没有找到订阅者的事件) post(new NoSubscriberEvent(this, event)); } } }

这边我们只需要关注 15 行。
// EventBus 类中 private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class eventClass) { // Subscription 是订阅者对象、响应函数的包装 CopyOnWriteArrayList subscriptions; synchronized (this) { // 根据事件的类类型,查找是否有对应的事件类型的 Subscription 对象的集合 subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { // 真正执行的入口 postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { // 取消事件的分发,退出循环 break; } } return true; } return false; }// EventBus 类中 private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { // 对于 threadMode 四种值所代表的不同含义,需要清楚 case POSTING: // 即在 post 发出后,同一线程中同步调用的 invokeSubscriber(subscription, event); break; // 响应函数被要求在主线程中被调用 case MAIN: // 如果当前线程是主线程 if (isMainThread) { // 效果同样是同一线程中同步调用的 invokeSubscriber(subscription, event); } else { // 否则就是在其他线程中发出了 post ,并且要求响应函数需要在主线程中被调用 mainThreadPoster.enqueue(subscription, event); } break; // 响应函数被要求在一个后台线程被调用起 case BACKGROUND: if (isMainThread) { // 当前是主线程,将利用线程池去使用其他线程调用响应函数 backgroundPoster.enqueue(subscription, event); } else { // 当前不是主线程,就直接同步调用 invokeSubscriber(subscription, event); } break; // 响应函数被要求在一个独立的线程被调用起 case ASYNC: // 将利用线程池去使用其他线程调用响应函数 asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }// EventBus 类中 // 反射调用响应函数 void invokeSubscriber(Subscription subscription, Object event) { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } }

先看默认的 threadMode 值 POSTING,它是直接调用 void invokeSubscriber(Subscription subscription, Object event),方法内部是利用反射调用的响应函数。而其他 threadMode 值的流程执行,最终都会执行到 void invokeSubscriber(Subscription subscription, Object event) ,不过是利用 Handler 进行线程切换,或者是利用线程池进行执行任务罢了。
threadMode 为 MAIN 时,若发出 post 时所在线程为主线程,那么就是同步调用;否则进入 mainThreadPoster.enqueue(subscription, event); 逻辑,进入查看相关代码。
// HandlerPoster 类中,它是 Handler 子类 void enqueue(Subscription subscription, Object event) { // 得到一个待发送的对象,包含了 Subscription 包装对象 PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { // 对象入队列 queue.enqueue(pendingPost); // 如果当前 handler 没有处于活动状态 if (!handlerActive) { handlerActive = true; // 发送消息,handler 处理队列中的待发送对象 // 这边能够实现切换线程,就是因为所关联的 looper 对象为主线程的 looper if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } } } }// HandlerPoster 类中 @Override public void handleMessage(Message msg) { boolean rescheduled = false; try { long started = SystemClock.uptimeMillis(); while (true) { // 循环从队列取出待发送的对象 PendingPost pendingPost = queue.poll(); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); // 队列为空(可能为都一一处理完后),将 handlerActive 置为 false if (pendingPost == null) { handlerActive = false; return; } } } // 此时已经完成线程切换为主线程了 // 最终也是会走到 EventBus 中反射调用响应函数中去的 eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } rescheduled = true; return; } } } finally { handlerActive = rescheduled; } }

threadMode 为 BACKGROUND 时,若发出 post 时所在线程为主线程,则需要去利用线程池去使用其他线程调用响应函数;否则就同步调用。
// BackgroundPoster 类中,它实现 Runnable 接口 public void enqueue(Subscription subscription, Object event) { // 得到一个待发送的对象,包含了 Subscription 包装对象 PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true; // 使用线程池去执行本对象的 run 方法 // 线程池存在默认的配置,也可以使用 EventBusBuilder 传入一个外部配置 eventBus.getExecutorService().execute(this); } } }// BackgroundPoster 类中 @Override public void run() { try { try { while (true) { // 循环从队列取出待发送的对象,并且调用响应函数 // 直到队列为空,退出循环; 另外需要注意这个等待1秒 PendingPost pendingPost = queue.poll(1000); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { executorRunning = false; return; } } } // 此时处于子线程 // 最终也是会走到 EventBus 中反射调用响应函数中去的 eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { Log.w("Event", Thread.currentThread().getName() + " was interruppted", e); } } finally { executorRunning = false; } }

threadMode 为 ASYNC 时的逻辑也很简单。
// AsyncPoster 类中,它实现了 Runnable 接口 public void enqueue(Subscription subscription, Object event) { // 得到一个待发送的对象,包含了 Subscription 包装对象 PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); // 对象入队列 queue.enqueue(pendingPost); // 立即执行 eventBus.getExecutorService().execute(this); }// AsyncPoster 类中 @Override public void run() { // 马上取出对象,并且执行响应函数 PendingPost pendingPost = queue.poll(); if(pendingPost == null) { throw new IllegalStateException("No pending post available"); } eventBus.invokeSubscriber(pendingPost); }

从以上代码可以看出有点类似与 threadMode 为 BACKGROUND 的逻辑,但区别还是有的。
threadMode 为 BACKGROUND 时,它在加入待发送对象的时候,会判断后台线程是否正在运行,如果正在运行,就默默入队列就好,反正会慢慢执行到队列尾部,然后完成所有的响应函数调用,就自动退出 run 方法了;如果不在运行,然后 eventBus.getExecutorService().execute(this); 利用线程池分配一个线程去开启运行,所以是众多的待发送对象处于同一个后台线程。
threadMode 为 ASYNC 时,就只是利用线程池每次得到一个独立的线程去执行任务。
另外想额外提一下,关于 sticky 事件的特性。还记得在 EventBus 类中的 subscribe() 方法对响应函数是否为 sticky 事件的处理:
if (subscriberMethod.sticky) { // 先不考虑事件继承 if (eventInheritance) { // Existing sticky events of all subclasses of eventType have to be considered. // Note: Iterating over all events may be inefficient with lots of sticky events, // thus data structure should be changed to allow a more efficient lookup // (e.g. an additional map storing sub classes of super classes: Class -> List). Set, Object>> entries = stickyEvents.entrySet(); for (Map.Entry, Object> entry : entries) { Class candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); // sticky 事件则直接调用响应函数 checkPostStickyEventToSubscription(newSubscription, stickyEvent); } }

【EventBus 源码试读(二)】以上的处理就是在 EventBus 基本使用和进阶配置 中 gif 中 sticky 事件的表现效果的原因。
如果 stickyEvents map 中能够找到订阅类中注解 sticky 为 true 的事件类型,那么当该订阅类刚注册时,仍然可以收到最新的此类型 sticky 事件。它是由于在该订阅类注册前,有地方调用了 public void postSticky(Object event),此时就将 event 加入到了 stickyEvents map 中。

    推荐阅读