开源框架解读--RxLifeCycle解析

一、介绍 RxLifecycle目的:解决RxJava使用中的内存泄漏问题。
例如,当使用RxJava订阅并执行耗时任务后,当Activityfinish时,如果耗时任务还未完成,没有及时取消订阅,就会导致Activity无法被回收,从而引发内存泄漏。
为了解决这个问题,就产生了RxLifecycle,让RxJava变得有生命周期感知,使得其能及时取消订阅,避免出现内存泄漏问题。
二、使用 首先来介绍下RxLifecycle的使用。
1.添加依赖

implementation 'com.trello.rxlifecycle2:rxlifecycle:2.2.1' implementation 'com.trello.rxlifecycle2:rxlifecycle-android:2.2.1' implementation 'com.trello.rxlifecycle2:rxlifecycle-components:2.2.1'

2.继承容器类
Activity/Fragment需要继承RxAppCompatActivity/RxFragment,主要支持如下几种容器类:
开源框架解读--RxLifeCycle解析
文章图片

只需要在项目中针对base类的容器中继承实现对应的Rx类即可,这一步主要是对生命周期的回调事件进行监听。
3.绑定容器生命周期
Activity为例,主要有如下两种方法:
bindUntilEvent(@NonNull ActivityEvent event) bindToLifecycle()

针对Fragment也有同样的两种方法,只是方法名会有所不同。
下面详细介绍这两种方法的区别:
bindUntilEvent
该方法指定在哪个生命周期方法调用时取消订阅。
其中ActivityEvent是一个枚举类,对应于Activity的生命周期。
public enum ActivityEvent { CREATE, START, RESUME, PAUSE, STOP, DESTROY }

具体使用示例:
override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) Observable.interval(1, TimeUnit.SECONDS) .doOnDispose { Log.i(TAG, "Unsubscribing subscription from onDestory()") } .compose(bindUntilEvent(ActivityEvent.DESTROY)) .subscribe { Log.i(TAG, "Started in onCreate(), running until in onDestroy(): $it") } }

指定在生命周期onDestory()时,取消订阅。
bindToLifecycle
在某个生命周期进行绑定,在对应的生命周期进行订阅解除。
具体使用示例:
override fun onResume() { super.onResume() Observable.interval(1, TimeUnit.SECONDS) .doOnDispose { Log.i(TAG, "Unsubscribing subscription from onPause()") } .compose(bindToLifecycle()) .subscribe { Log.i(TAG, "Started in onResume(), running until in onPause(): $it") } }

onResume()进行绑定订阅,则在onPause()进行解除订阅,生命周期是两两对应的。
三、原理解析 1.compose
首先来了解一下compose操作符。
compose(bindToLifecycle()) compose(bindUntilEvent(ActivityEvent.DESTROY))

如上所示,两种绑定生命周期的方式,都是通过compose操作符进行实现的。
compose一般情况下可以配合Transformer使用,以实现将一种类型的Observable转换成另一种类型的Observable,保证调用的链式结构。
那么接下来看该操作符在RxLifecycle中的应用,从bindToLifecyclebindUntilEvent入手。
2.BehaviorSubject
public abstract class RxAppCompatActivity extends AppCompatActivity implements LifecycleProvider {private final BehaviorSubject lifecycleSubject = BehaviorSubject.create(); @Override @NonNull @CheckResult public final Observable lifecycle() { return lifecycleSubject.hide(); }@Override @NonNull @CheckResult public final LifecycleTransformer bindUntilEvent(@NonNull ActivityEvent event) { return RxLifecycle.bindUntilEvent(lifecycleSubject, event); }@Override @NonNull @CheckResult public final LifecycleTransformer bindToLifecycle() { return RxLifecycleAndroid.bindActivity(lifecycleSubject); }@Override @CallSuper protected void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); lifecycleSubject.onNext(ActivityEvent.CREATE); }@Override @CallSuper protected void onStart() { super.onStart(); lifecycleSubject.onNext(ActivityEvent.START); }@Override @CallSuper protected void onResume() { super.onResume(); lifecycleSubject.onNext(ActivityEvent.RESUME); }@Override @CallSuper protected void onPause() { lifecycleSubject.onNext(ActivityEvent.PAUSE); super.onPause(); }@Override @CallSuper protected void onStop() { lifecycleSubject.onNext(ActivityEvent.STOP); super.onStop(); }@Override @CallSuper protected void onDestroy() { lifecycleSubject.onNext(ActivityEvent.DESTROY); super.onDestroy(); } }

RxAppCompatActivity中有一个关键对象BehaviorSubject
BehaviorSubject会发送离订阅最近的上一个值,没有上一个值的时候会发送默认值。如下图:
开源框架解读--RxLifeCycle解析
文章图片

所以lifecycleSubject会根据绑定订阅的时期,不断发送接下来的生命周期事件ActivityEvent
3.LifecycleTransformer
接下来继续看源码,bindToLifecyclebindUntilEvent都返回了一个LifecycleTransformer对象,那么LifecycleTransformer到底有什么用?
@ParametersAreNonnullByDefault public final class LifecycleTransformer implements ObservableTransformer, FlowableTransformer, SingleTransformer, MaybeTransformer, CompletableTransformer { final Observable observable; LifecycleTransformer(Observable observable) { checkNotNull(observable, "observable == null"); this.observable = observable; }@Override public ObservableSource apply(Observable upstream) { return upstream.takeUntil(observable); }@Override public Publisher apply(Flowable upstream) { return upstream.takeUntil(observable.toFlowable(BackpressureStrategy.LATEST)); }@Override public SingleSource apply(Single upstream) { return upstream.takeUntil(observable.firstOrError()); }@Override public MaybeSource apply(Maybe upstream) { return upstream.takeUntil(observable.firstElement()); }@Override public CompletableSource apply(Completable upstream) { return Completable.ambArray(upstream, observable.flatMapCompletable(Functions.CANCEL_COMPLETABLE)); }@Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; }LifecycleTransformer that = (LifecycleTransformer) o; return observable.equals(that.observable); }@Override public int hashCode() { return observable.hashCode(); }@Override public String toString() { return "LifecycleTransformer{" + "observable=" + observable + '}'; } }

LifecycleTransformer实现了各种Transformer接口,能够将一个 Observable/Flowable/Single/Completable/Maybe 对象转换成另一个 Observable/Flowable/Single/Completable/Maybe对象。正好配合上文的compose操作符,使用在链式调用中。
4.takeUntil
接下来到了关键了,LifecycleTransformer到底把原来的Observable对象转换成了什么样子?
这就需要了解takeUntil操作符了!
开源框架解读--RxLifeCycle解析
文章图片

当第二个Observable发射了一项数据或者终止时,丢弃原Observable发射的任何数据。所谓的第二个Observable,即传入takeUntil中的Observable对象。
理解了该操作符的作用,那么你可能就明白了,RxLifecycle就是通过监听第二个Observable发射的数据,来解除订阅。
那么这第二个Observable是谁?
不就是在创建LifecycleTransformer的时候传入构造函数中的嘛,那就来寻找一下什么时候创建的该对象即可。
从头开始捋一捋:
public final LifecycleTransformer bindUntilEvent(@NonNull ActivityEvent event) { return RxLifecycle.bindUntilEvent(lifecycleSubject, event); }

该方法返回了LifecycleTransformer对象,继续向下追溯。
public static LifecycleTransformer bindUntilEvent(@Nonnull final Observable lifecycle, @Nonnull final R event) { checkNotNull(lifecycle, "lifecycle == null"); checkNotNull(event, "event == null"); return bind(takeUntilEvent(lifecycle, event)); }private static Observable takeUntilEvent(final Observable lifecycle, final R event) { return lifecycle.filter(new Predicate() { @Override public boolean test(R lifecycleEvent) throws Exception { return lifecycleEvent.equals(event); } }); }

继续追踪,马上接近真相。
public static LifecycleTransformer bind(@Nonnull final Observable lifecycle) { return new LifecycleTransformer<>(lifecycle); }

在该方法中创建了该对象,并传入了一个Observable对象,通过上面方法即可知道该对象就是BehaviorSubject对象。
那么该对象在什么时候发送第一次数据呢?
这就要看上面的takeUntilEvent方法了。
关键在这一句lifecycleEvent.equals(event),只有当BehaviorSubject发送的ActivityEvent的值等于解除绑定的生命周期时,才会发送第一次数据。那么当发送第一次数据时,根据上面的分析就会解除订阅的绑定。
那么针对bindToLifecycle方法,是进行怎样的操作,使得在对应的生命周期进行解除订阅呢?
还是继续看源码。
public final LifecycleTransformer bindToLifecycle() { return RxLifecycleAndroid.bindActivity(lifecycleSubject); } public static LifecycleTransformer bindActivity(@NonNull final Observable lifecycle) { return bind(lifecycle, ACTIVITY_LIFECYCLE); }

其中ACTIVITY_LIFECYCLE为:
private static final Function ACTIVITY_LIFECYCLE = new Function() { @Override public ActivityEvent apply(ActivityEvent lastEvent) throws Exception { switch (lastEvent) { case CREATE: return ActivityEvent.DESTROY; case START: return ActivityEvent.STOP; case RESUME: return ActivityEvent.PAUSE; case PAUSE: return ActivityEvent.STOP; case STOP: return ActivityEvent.DESTROY; case DESTROY: throw new OutsideLifecycleException("Cannot bind to Activity lifecycle when outside of it."); default: throw new UnsupportedOperationException("Binding to " + lastEvent + " not yet implemented"); } } };

该函数的功能是会根据传入的生命周期事件,返回对应的生命周期,如CREATEDESTROY。看来通过该函数就可以实现在对应生命周期解绑了。
不过还需要一系列操作符的协助,继续看源码。
public static LifecycleTransformer bind(@Nonnull Observable lifecycle, @Nonnull final Function correspondingEvents) { checkNotNull(lifecycle, "lifecycle == null"); checkNotNull(correspondingEvents, "correspondingEvents == null"); return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents)); }private static Observable takeUntilCorrespondingEvent(final Observable lifecycle, final Function correspondingEvents) { return Observable.combineLatest( lifecycle.take(1).map(correspondingEvents), lifecycle.skip(1), new BiFunction() { @Override public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception { return lifecycleEvent.equals(bindUntilEvent); } }) .onErrorReturn(Functions.RESUME_FUNCTION) .filter(Functions.SHOULD_COMPLETE); }

详细看一下takeUntilCorrespondingEvent方法。
5.take
首先看一下take操作符,很简单。
take(int)用一个整数n作为一个参数,只发射前面的n项,如下图:
开源框架解读--RxLifeCycle解析
文章图片

那么对应lifecycle.take(1).map(correspondingEvents),即获取发送的第一个生命周期事件,再通过上面对应的函数,转换为响应的生命周期。如果在onCreate中进行绑定,那么第一个发送的就是CREATE,返回的就是对应的DESTORY
6.skip
skip(int)忽略Observable发射的前n项数据
开源框架解读--RxLifeCycle解析
文章图片

lifecycle.skip(1),如果在onCreate中进行绑定,那么剩余的就是STARTRESUMEPAUSESTOPDESTROY
7. combineLatest
最后还需要一个关键的操作符combineLatest,来完成对应生命周期的解除订阅。
combineLatest操作符可以将2~9个Observable发射的数据组装起来然后再发射出来。不过还有两个前提:
  • 所有的Observable都发射过数据。
  • 满足上面条件的时候任何一个Observable发射一个数据,就将所有Observable最新发射的数据按照提供的函数组装起来发射出去。
具体示例,如下图所示:
开源框架解读--RxLifeCycle解析
文章图片

按照第三个参数的函数,将lifecycle.take(1).map(correspondingEvents)lifecycle.skip(1),进行combine
new BiFunction() { @Override public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception { return lifecycleEvent.equals(bindUntilEvent); } }

那么结果是
false,false,false,false,true

之后的onErrorReturnfilter是对异常的处理和判断是否应该结束订阅:
//异常处理 static final Function RESUME_FUNCTION = new Function() { @Override public Boolean apply(Throwable throwable) throws Exception { if (throwable instanceof OutsideLifecycleException) { return true; }//noinspection ThrowableResultOfMethodCallIgnored Exceptions.propagate(throwable); return false; } }; //是否应该取消订阅,依赖于上游的boolean static final Predicate SHOULD_COMPLETE = new Predicate() { @Override public boolean test(Boolean shouldComplete) throws Exception { return shouldComplete; } };

所以,按照上面的例子,如果在onCreate()方法中进行绑定,那么在onDestory()方法中就会对应的解除订阅。
四、总结 通过上面的分析,可以了解RxLifecycle的使用以及原理。
【开源框架解读--RxLifeCycle解析】学习RxLifecycle的过程中,更加体会到了对于观察者模式的使用,以及RxJava操作符的强大,各种操作符帮我们实现一些列的转换。

    推荐阅读