RxJava源码分析(一)基本的数据流分析(无背压)

引言 关于RxJava2的用法网上的资料很多,这里我们只学习它的实现原理。本文专题目的:
1.知道源头(Observable)是如何将数据发送出去的。
2.知道终点(Observer)是如何接收到数据的。
3.何时将源头和终点关联起来的
今天我们先从最简单的无背压(Observable)的create操作符说起,来解决前三个问题。
样例

//1.创建被观察者,生产事件 final Observable observable = Observable.create(new ObservableOnSubscribe() { @Override //2.订阅的时候发送事件 public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); //emitter.onError(new Throwable("haha")); emitter.onComplete(); //onComplete事件发送后,后面的所有事件无效,且后面不能发送错误事件 emitter.onNext(3); } }); //3.定义观察者 Observer observer = new Observer() {@Override public void onSubscribe(Disposable d) { Log.e(TAG, "开始采用subscribe连接"); mDisposable = d; }@Override public void onNext(Integer value) { Log.e(TAG, "对Next事件" + value + "作出响应"); }@Override public void onError(Throwable e) { Log.e(TAG, "对Error事件作出响应"); }@Override public void onComplete() { Log.e(TAG, "对Complete事件作出响应"); } }; //4建立联系 observable.subscribe(observer);

我们看到出现了一下几个角色:
  1. Observable:被观察者,是数据的源头,通过subscribe订阅被观察者;
  2. ObservableOnSubscribe:从代码结构上看,Observable的构造方法需要它,且持有subscribe方法,这里暂时理解为观察者和被观察者的中间件,具体作用后面再看;
  3. ObservableEmitter:顾名思义,是数据发射器,被观察者通过它发送事件;
  4. Observer:被观察者,数据接受者,持有onNext、onError、onComplete、onSubscribe方法。
Observable
public abstract class Observable implements ObservableSource { ... }

实现了ObservableSource接口:
public interface ObservableSource { /** * Subscribes the given Observer to this ObservableSource instance. * @param observer the Observer, not null * @throws NullPointerException if {@code observer} is null */ void subscribe(Observer observer); }

接口很简单,提供了订阅观察者的功能,Observable中该方法的实现后面再看。我们先看看create操作符干了些啥:
public abstract class Observable implements ObservableSource { ... @SchedulerSupport(SchedulerSupport.NONE) public static Observable create(ObservableOnSubscribe source) { //判空 ObjectHelper.requireNonNull(source, "source is null"); //构造ObservableCreate对象 return RxJavaPlugins.onAssembly(new ObservableCreate(source)); } ... }

最后通过用户构造的ObservableOnSubscribe对象,返回了ObservableCreate对象。我们先看看ObservableOnSubscribe。
数据发射封装-ObservableOnSubscribe
public interface ObservableOnSubscribe { /** * Called for each Observer that subscribes. * @param e the safe emitter instance, never null * @throws Exception on error */ void subscribe(ObservableEmitter e) throws Exception; }

【RxJava源码分析(一)基本的数据流分析(无背压)】是个接口,用户发射数据就是在这个接口实现中完成,具体见样例代码,这里流一个疑问:入参ObservableEmitter是怎么来的,别急后面马上会讲到!
create操作符的产物-ObservableCreate
然后再看Observable的子类ObservableCreate,根据名字我们可以猜到它是由create操作符创建的被观察者:
public final class ObservableCreate extends Observable { final ObservableOnSubscribe source; //用户实现具体的数据发射操作public ObservableCreate(ObservableOnSubscribe source) { this.source = source; }@Override protected void subscribeActual(Observer observer) { CreateEmitter parent = new CreateEmitter(observer); observer.onSubscribe(parent); try { //调用中间件ObservableOnSubscribe的订阅方法,开始调用发射数据的代码了! //说明1 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } ... }

这里的source由用户构造,实现发射数据操作,subscribeActual方法是核心,当订阅观察者是,最终会执行subscribeActual方法,后面会具体说明,不过看方法名也应该能猜到。
前面我们讲ObservableOnSubscribe的subscribe方法时,关于入参的来源留下了一个疑问,这里看说明1的代码:入参parent类型为CreateEmitter,很明显它必然是ObservableEmitter的子类或者子接口。
事件发射器--ObservableEmitter
public interface ObservableEmitter extends Emitter { ... }public interface Emitter { /** * Signal a normal value. * @param value the value to signal, not null */ void onNext(T value); /** * Signal a Throwable exception. * @param error the Throwable to signal, not null */ void onError(Throwable error); /** * Signal a completion. */ void onComplete(); }

可见发射器接口提供了发射数据的功能,在回过头来看看CreateEmitter,它是ObservableCreate的内部类.
public final class ObservableCreate extends Observable { ... ... static final class CreateEmitter extends AtomicReference implements ObservableEmitter, Disposable {private static final long serialVersionUID = -3434801548987643227L; //持有观察者,发射器的发送数据的方法其实是调用观察者对应的方法 final Observer observer; CreateEmitter(Observer observer) { this.observer = observer; }@Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { //调用观察者的接收数据的方法 observer.onNext(t); } }@Override public void onError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } } else { RxJavaPlugins.onError(t); } }@Override public void onComplete() { if (!isDisposed()) { try { //发送完成事件后,断开连接,不接受后序事件 observer.onComplete(); } finally { dispose(); } } } ... } }

饶了半天,我们终于找到被观察者被调用的地方了,用户调用发射器的发送数据的方法最终会通过ObservableCreate中的CreateEmitter实现调用,而CreateEmitter最终又会调用观察者的接收数据方法,到此为止,下游接收数据的流程如下:
1.create操作符通过ObservableOnSubscribe对象构造ObservableCreate对象;
  1. ObservableCreate在执行订阅方法subscribeActual时,通过Observer对象构造发射器CreateEmitter;
  2. CreateEmitter发射数据最终会调用Observer对应的接收数据方法。
Observable的订阅方法 上面我们理解了接收数据的流程,下面我们瞅瞅Observable和Observer建立联系的订阅方法:
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); //subscribeActual核心代码!!! subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }

lei了lei了!看到subscribeActual方法就像遇到亲人了,之前我们了解到Create操作法创建的是ObservableCreate对象,这里用户执行订阅方法时会调用subscribeActual,我们再回头看看ObservableCreate的subscribeActual实现:
@Override protected void subscribeActual(Observer observer) { //根据Observer构造发射器 CreateEmitter parent = new CreateEmitter(observer); observer.onSubscribe(parent); try { //这里由用户实现发射数据操作 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }

读到这里,发现整个数据的产生和接收终于打通,订阅方法通过 source.subscribe(parent)由用户发射数据,在本样例中就是:
final Observable observable = Observable.create(new ObservableOnSubscribe() { @Override //2.订阅的时候发送事件 public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); //emitter.onError(new Throwable("haha")); emitter.onComplete(); //onComplete事件发送后,后面的所有事件无效,且后面不能发送错误事件 emitter.onNext(3); } });

emitter参数就是CreateEmitter类型发射器parent。相信看到这里,整个数据的流程应该比较清晰了。数据流向如下:
Observable订阅Observer--> Observable执行subscribeActual方法--> ObservableOnSubscribe执行subscribe方法--> ObservableEmitter执行发射数据方法--> Observer执行接收数据方法。
我们可以看到,RxJava规定了数据从Observable到Observer的统一流程,至于用户发送什么数据、按什么顺序发都通过中间件ObservableOnSubscribe和ObservableEmitter实现。

    推荐阅读