Java|EventBus使用

EventBus简介 EventBus,事件总线,guava基于观察者模式的优雅实现。对于事件监听和发布订阅模式,使用guava eventbus可以省去开发人员很多事情,不用在去定义那些复杂的类或接口来实现事件发布与订阅。在guava eventbus中,开发人员只需要在订阅方法上添加上@Subscribe注解就可以了,这样一来就省去了大量共用的编码工作。guava eventbus提供了同步或者异步消息发布的实现,用户可以根据需要编写相关的代码。使用异步消息时需要通过一个Executor来辅助。
【Java|EventBus使用】EventBus中的事件可以了是任意类型的,事件分发的时候只需要根据订阅参数类型来分发消息,如果编码中,多个订阅事件类型上存在类型继承的关系,则当前的事件会分发到多个不同的订阅者上,这一点大家在使用的时候可能要仔细处理,在不需要重复处理的消息上就要做好细节处理了。另外,guava eventbus中默认订阅方法为线程不安全的,在异步调度时会自动将其包装成线程安全的方法。对于一般线程安全的实现上,可以通过@AllowConcurrentEvents注解来标识。如果发布了某些未被订阅的事件,可以通过DeadEvent获取。
EventBus同步串行分发所有的事件,所以最好能保持消息处理方法的轻量化,否则会很耗时间。如果有需要处理重量级的事件,可以使用异步事件总线AsyncEventBus,它跟EventBus功能上一样,通过将ExecutorService对象作为构造函数的参数,并对处理事件的方法添加@AllowConcurrentEvents注解,它能使用线程池来异步并发处理事件,大大加快处理速度。如果处理事件的方法没有@AllowConcurrentEvents注解,纵然使用线程池的不同线程来处理不同的实践,但却只能串行执行,无法加快处理速度。
订阅事件 定义一个单个参数的以给定事件类型为参数的共有方法,并对该方法加 @Subscribe 注解
以上述类的实例作为参数来注册EventBus

private static class EventListener1 {@Subscribe public void onEvent(EventA event) { System.out.println("[onEvent]我订阅的是A事件,已收到: " + event); }@Subscribe public void listen(EventA event) { System.out.println("[listen]我订阅的是A事件,已收到: " + event); } } public static void main(String[] args) { EventBus eventBus = new EventBus(); eventBus.register(new EventListener()); System.out.println("发布事件A"); eventBus.post(new EventA()); }

测试代码输出:
发布事件 A
[listen]我订阅的是A事件,已收到: EventA
[onEvent]我订阅的是A事件,已收到: EventA
从执行结果可以看出,只要是订阅了EventA事件的方法都会对该事件进行相应的处理。
异步事件总线
ExecutorService threadPool = Executors.newCachedThreadPool(); EventBus eventBusAnother = new AsyncEventBus(threadPool); eventBusAnother.register(new EventListener()); for (int i = 0; i < 100; i++) { eventBusAnother.post(new EventC()); }

@Subscribe @AllowConcurrentEvents //不加该注解的话对该EventD的处理会串行执行,很慢 public void onEvent(EventD event) throws InterruptedException { String threadName = Thread.currentThread().getName(); System.out.format("%s sleep 一会%n", threadName); Thread.sleep(1000); System.out.println("我订阅的是D事件,已收到: " + event); System.out.format("%s sleep 完成%n", threadName); }

DeadEvent 如果发布了某些未被订阅的事件,可以通过DeadEvent获取。
System.out.println("发布灵异事件"); eventBus.post(new EventBusTest());

@Subscribe public void onEvent(DeadEvent event) { System.out.println("发布了错误的事件: " + event.getEvent()); }

对具有继承关系的事件处理 多个订阅事件类型上存在类型继承的关系,则当前的事件会分发到多个不同的订阅者上
private static class EventB extends EventA { @Override public String toString() { return "EventB"; } }

发布事件 B
我订阅的是B事件,已收到: EventB
我订阅的是A事件,已收到: EventB
总的参考代码
package com.example.event; import com.google.common.eventbus.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author mnmlist@163.com * @date 2017/06/08 * @time 11:02 */ public class EventBusTest {public static void main(String[] args) { EventBus eventBus = new EventBus(); eventBus.register(new EventListener()); System.out.println("发布事件A"); eventBus.post(new EventA()); System.out.println("发布事件 B "); eventBus.post(new EventB()); System.out.println("发布事件 X "); eventBus.post(new EventX()); System.out.println("发布灵异事件"); eventBus.post(new EventBusTest()); System.out.println("发布事件C"); ExecutorService threadPool = Executors.newCachedThreadPool(); EventBus eventBusAnother = new AsyncEventBus(threadPool); eventBusAnother.register(new EventListener()); for (int i = 0; i < 100; i++) { eventBusAnother.post(new EventC()); } System.out.println("发布事件D"); for (int i = 0; i < 100; i++) { eventBusAnother.post(new EventD()); } threadPool.shutdown(); }private static class EventA { @Override public String toString() { return "EventA"; } }private static class EventB extends EventA { @Override public String toString() { return "EventB"; } }private static class EventC { @Override public String toString() { return "EventC"; } }private static class EventD { @Override public String toString() { return "EventD"; } }private static class EventX { @Override public String toString() { return "EventX"; } }private static class EventListener {@Subscribe public void onEvent(EventA event) { System.out.println("我订阅的是A事件,已收到: " + event); }@Subscribe public void listen(EventA event) { System.out.println("[listen]我订阅的是A事件,已收到: " + event); }@Subscribe public void onEvent(EventB event) { System.out.println("我订阅的是B事件,已收到: " + event); }@Subscribe @AllowConcurrentEvents public void onEvent(EventC event) throws InterruptedException { String threadName = Thread.currentThread().getName(); System.out.format("%s sleep 一会%n", threadName); Thread.sleep(1000); System.out.println("我订阅的是C事件,已收到: " + event); System.out.format("%s sleep 完成%n", threadName); }@Subscribe @AllowConcurrentEvents public void onEvent(EventD event) throws InterruptedException { String threadName = Thread.currentThread().getName(); System.out.format("%s sleep 一会%n", threadName); Thread.sleep(1000); System.out.println("我订阅的是D事件,已收到: " + event); System.out.format("%s sleep 完成%n", threadName); }@Subscribe public void onEvent(EventX event) { System.out.println("我订阅的是X事件,已收到: " + event); }@Subscribe public void onEvent(DeadEvent event) { System.out.println("发布了错误的事件: " + event.getEvent()); } } }

    推荐阅读