RxJava详解之操作符执行原理(五)

RxJava详解之操作符执行原理(五) 上一篇文章介绍了RxJava的执行原理。下面继续介绍一下操作符的执行原理,但是操作符太多了,这里用map来进行说明。

Observable observable = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { subscriber.onNext("Hello "); subscriber.onNext("World !"); subscriber.onCompleted(); } }); observable.map(new Func1() { @Override public String call(String s) { return "say" + s; } }); observable.subscribe(new Subscriber() { @Override public void onCompleted() {}@Override public void onError(Throwable e) {}@Override public void onNext(String s) { Log.i("@@@", s); } });

执行结果很显然是say Hellosay World !
我们直接进入Observable.map()方法的源码:
public final Observable map(Func1 func) { return create(new OnSubscribeMap(this, func)); }public static Observable create(OnSubscribe f) { return new Observable(RxJavaHooks.onCreate(f)); }

map的内部调用了create()方法,而create()方法的源码我们再上一个版本已经介绍了,也就是说map内部会创建一个新的Observable对象,而且用一个新的OnSubscribeMap对象作为参数。
OnSubscribeMap()对象的参数分别是之前通过create()方法创建的Observable对象,以及map中传递过来的Func1类的对象。接下来就直接看OnSubscribeMap类的源码,他实现了OnSubscribe接口,并重写了call()方法:
/** * Applies a function of your choosing to every item emitted by an {@code Observable}, and emits the results of * this transformation as a new {@code Observable}. */ public final class OnSubscribeMap implements OnSubscribe { // 最初Observable.create()创建的Observable对象 final Observable source; // map方法传递过来的func1对象,它是一个转换功能 final Func1 transformer; public OnSubscribeMap(Observable source, Func1 transformer) { this.source = source; this.transformer = transformer; }@Override public void call(final Subscriber o) { MapSubscriber parent = new MapSubscriber(o, transformer); // 把新创建的MapSubscriber添加到Observable.subscribe(subscribe)方法传递的参数subscriber中 o.add(parent); // unsafeSubscribe是subscribe方法的一个安全性不高的操作,可以简单理解为subscribe方法 source.unsafeSubscribe(parent); }static final class MapSubscriber extends Subscriber {final Subscriber actual; final Func1 mapper; boolean done; public MapSubscriber(Subscriber actual, Func1 mapper) { this.actual = actual; this.mapper = mapper; }@Override public void onNext(T t) { R result; try { // 先会执行以下转换函数的call方法,然后将结果再传递给Subscribe对象调用它的onNext方法 result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; }actual.onNext(result); }@Override public void onError(Throwable e) { if (done) { RxJavaHooks.onError(e); return; } done = true; actual.onError(e); }@Override public void onCompleted() { if (done) { return; } actual.onCompleted(); }@Override public void setProducer(Producer p) { actual.setProducer(p); } } }

【RxJava详解之操作符执行原理(五)】因为在执行observable.subscribe(subscriber)方法会调用到call()方法,这里看一下call()方法的核心:
MapSubscriber parent = new MapSubscriber(o, transformer); // 把新创建的MapSubscriber添加到Observable.subscribe(subscribe)方法传递的参数subscriber中 o.add(parent); // unsafeSubscribe是subscribe方法的一个安全性不高的操作,可以简单理解为subscribe方法,注意这里传递的是parent,也就是先创建的MapSubscriber对象,而这里的source是谁呢? 它是最初Observable.create创建的Observable对象 source.unsafeSubscribe(parent);

add()unsasfeSubscribe()方法如下:
private final SubscriptionList subscriptions; public final void add(Subscription s) { subscriptions.add(s); }public final Subscription unsafeSubscribe(Subscriber subscriber) { try { // new Subscriber so onStart it subscriber.onStart(); // allow the hook to intercept and/or decorate RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(RxJavaHooks.onObservableError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. RxJavaHooks.onObservableError(r); // TODO why aren't we throwing the hook's return value. throw r; // NOPMD } return Subscriptions.unsubscribed(); } }

这一块代码就很简单了,因为和前面一篇我们分析的subscribe()方法类似,相当于直接调用了最初Observable.create()创建的Observable对象的call(subscriber)方法,而这里的subscriber又是我们创建的MapSubscriber的子类,所以这里相当于调用了MapSubscriber类中的onNext()onComplete()onError()方法:
static final class MapSubscriber extends Subscriber {final Subscriber actual; final Func1 mapper; boolean done; public MapSubscriber(Subscriber actual, Func1 mapper) { this.actual = actual; this.mapper = mapper; }@Override public void onNext(T t) { R result; try { // 先会执行以下转换函数的call方法,这个就是我们把Hello修改为say Hello的部分 result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; } // 将转换函数的call方法的执行结果交给最初的Subscriber.onNext()方法的参数来执行 actual.onNext(result); } ... }

乱不乱?
梳理一下:
  • 我们把不使用map操作符时正常的操作创建的ObservableSubscriber分别称为1号。
  • map又会通过create分辨创建一个Observable2号和Subscriber2号,当我们执行map的时候,会最终执行到Subscriber2号的onNext()方法中,而该方法内部会先执行一些转换操作,然后将执行完的结果作为参数传递给并调用最初的Subscriber1号的onNext()方法。懂不? 多看两遍,这里有点绕。
更多内容请看下一篇文章RxJava详解(六)
更多精彩文章请见:Github AndroidNote,欢迎Star

    推荐阅读