RxJava2.0(四)谈一谈基础功能源码实现

前言
我们在使用RxJava的时候最常用的功能就是写一个被观察者、一个观察者。在被观察者中发射数据,在观察者中接收数据,最后用subscribe将两者给订阅起来实现最基础的功能。例如下面这种:

//被观察者 Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext("aa"); e.onNext("bb"); e.onNext("cc"); e.onNext("dd"); e.onComplete(); } }); //观察者 Observer observer = new Observer() { @Override public void onSubscribe(Disposable d) { //TODO 初始化数据 d.dispose(); }@Override public void onNext(String value) { //TODO 接收被观察者发送的数据 Log.i("result:",value); }@Override public void onError(Throwable e) { //TODO 错误}@Override public void onComplete() { //TODO 完成之后回调 } }; //订阅 observable.subscribe(observer); 复制代码

那么在这种情况下,被观察者是如何发送数据给观察者?观察者又是如何接收数据?两者又是如何被subscribe订阅起来的呢?下面我们通过源码的分析来查看这一切的操作流程。
Observable.creat()
首先,在创建被观察者的时候,一般来说都是通过Observable.creat()来创建。那么我们进入creat()方法去看看里面做了什么操作。
@SchedulerSupport(SchedulerSupport.NONE) public static Observable create(ObservableOnSubscribe source) {//非空判断 ObjectHelper.requireNonNull(source, "source is null"); //返回一个Observable return RxJavaPlugins.onAssembly(new ObservableCreate(source)); } 复制代码

我们跳过非空判断逻辑,直接查看return。这里会通过RxJavaPlugins.onAssembly()返回一个Observable对象。那么我们跳进onAssembly去查看里面是如何进行Observable的创建的。
public static Observable onAssembly(Observable source) {...//返回传入的参数对象 return source; } 复制代码

这里其实没做什么很特别的操作,仅仅只是返回了参数对象。也就是说我们现在应该返回去重点研究的是这个参数对象source。而这个source根据前面的源码查看,可以看到其实是new ObservableCreate()。我们跳进这里查看
ObservableCreat()
//ObservableCreat继承了Observable,为被观察者Observable的子类 public final class ObservableCreate extends Observable { final ObservableOnSubscribe source; //构造方法,传入参数ObservableOnSubscribe,也就是我们在里面进行onNext、onComplete与onError的方法。 public ObservableCreate(ObservableOnSubscribe source) { this.source = source; }//此方法是在被订阅subscribe时候才调用,具体后面再说。 @Override protected void subscribeActual(Observer observer) { CreateEmitter parent = new CreateEmitter(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } } 复制代码

代码进行了一些删减。
通过上面代码逻辑注释可以看到:
1.ObservableCreat为Observable的子类,因为他拥有Observable的全部特性。
2.在ObservableCreat构造方法中传入了ObservableOnSubscribe,这个具体的作用我们下面讲。
3.有一个subscribeActual方法,这个方法其实是后面用作订阅的方法subscribe来实现的具体方式。
现在我们再来看看传入ObservableCreat中参数ObservableOnSubscribe
public interface ObservableOnSubscribe {/** * Called for each Observer that subscribes. * @param e the safe emitter instance, never null * @throws Exception on error */ void subscribe(ObservableEmitter e) throws Exception; } 复制代码

发现ObservableOnSubscribe其实是一个接口,而这个接口里有一个方法,专门用来实现我们的向观察者发送消息的方法。到此,被观察者在进行creat()的源码分析完毕,我们来总结一下。
总结:
1.在进行creat的时候,内部返回了一个Observable,而这个Observable实际上是一个继承了ObserVable的ObservableCreat类
2.ObserVable的ObservableCreat构造方法中传入了一个接口ObservableOnSubscribe,我们一般进行数据的发送都是通过这个接口中的subscribe方法里的ObservableEmitter来进行onNext、onComplete与onError。
Obsevable.subscribe()
public final void subscribe(Observer observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try {//获取observer observer = RxJavaPlugins.onSubscribe(this, observer); //非空判断 ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); //订阅方法 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } } 复制代码

抛开上面的逻辑判断不谈,我们直接看订阅方法subscribeActual()。不知道大家还记不记得,在我们讲Observable.creat的时候,在ObservableCreat这个类里面有两个用到的方法,一个是构造方法,传入接口ObservableOnSubscribe,另外一个是subscribeActual。而现在Observable.subscribe实际上就是在执行这个方法。
@Override protected void subscribeActual(Observer observer) { //实例化CreatEmitter对象 CreateEmitter parent = new CreateEmitter(observer); //执行观察者中的onSubscribe方法 //onSubscribe()回调所在的线程是ObservableCreate执行subscribe()所在的线程 //和subscribeOn()与observeOn()无关! observer.onSubscribe(parent); try { //真正的订阅方法 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } 复制代码

我们在来看下这方法中实例化CreatEmitter对象的这个类。
static final class CreateEmitter extends AtomicReference implements ObservableEmitter, Disposable {private static final long serialVersionUID = -3434801548987643227L; final Observer observer; CreateEmitter(Observer observer) { this.observer = observer; }@Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { //当没有被取消订阅的时候就执行onNext()方法用于发送数据 observer.onNext(t); } }@Override public void onError(Throwable t) { //出现错误时调用这个方法,用于抛出异常,并且在抛出之后的finally中调用dispose用于取消订阅 if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } } else { RxJavaPlugins.onError(t); } }@Override public void onComplete() { //判断如果没有执行disposed方法就调用onComplete并且dispose。这也就是为什么onComplete与onError为什么只会执行其中一个 if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } }复制代码

里面的逻辑可以说是一点都不复杂,就是我们平时经常使用的onNext、onComplete与onError方法。
onNext():先判断发送的消息是否为null,如果为空则调用onError方法来抛出异常。若不为空并且并未取消订阅,则发送数据。
onError():出现错误的时候执行这个方法。当抛去异常之后通过finally强行执行dispose()方法,来强制结束掉订阅。
onComplete():判断如果没有执行disposed方法就调用onComplete并且dispose。这也就是为什么onComplete与onError为什么只会执行其中一个。
总结
分析一下各个类的职责:
Observable :个人理解是装饰器模式下的基类,实际上所有操作都是Observable的子类进行的实现
ObservableOnSubscribe: 接口,定义了数据源的发射行为
ObservableCreate: 装饰器模式的具体体现,内部存储了数据源的发射事件,和subscribe订阅事件
ObservableEmitter: 数据源发射器,内部存储了observer
Observer: 接收到数据源后的回调(比如打印数据等)
1.Observable.create(),实例化ObservableCreate和ObservableOnSubscribe,并存储数据源发射行为,准备发射(我已经准备好数据源,等待被订阅)
2.Observable.subscribe(),实例化ObservableEmitter(发射器ObservableEmitter准备好!数据发射后,数据处理方式Observer已准备好!)
3.执行Observer.onSubscribe()回调,ObservableEmitter作为Disposable参数传入
4.执行ObservableOnSubscribe.subscribe()方法(ObservableEmitter发射数据,ObservableEmitter内部的Observer处理数据)
具体其他的一些操作符的用法,请参考我的github:RxJavaDemo
有兴趣可以关注我的小专栏,学习更多知识:小专栏
RxJava2.0(四)谈一谈基础功能源码实现
文章图片

【RxJava2.0(四)谈一谈基础功能源码实现】

    推荐阅读