Android|RxJava2详解(三)--调度器
调度器(Scheduler) ReactiveX通过对线程调度的封装,让开发者无需关注线程管理、线程同步、线程安全、并发数据结构和非阻塞IO等底层实现,而Scheduler
是RxJava异步和并行计算的关键。
RxJava借鉴了Iterable
/Iterator
模式的思想,定义了一套Scheduler
/Worker
API。RxJava的Scheduler
不进行任何调度的工作,但它负责创建Worker
,Worker
负责实际调度,无论是直接调度还是递归调度。此外Worker
还实现了Subscription
接口,所以它可以被取消订阅,这会取消所有还未执行的任务,此后也不会再接受新的任务(尽可能)。这对于操作符(例如重复任务)使用Scheduler
非常有用,如果下游取消订阅了整个链条,就能一次取消所有定时的任务。
Scheduler
/Worker
需要满足以下的要求:
- 所有的方法都需要是线程安全的;
Worker
需要保证即时、串行提交的任务按照先进先出(FIFO)的顺序被执行;Worker
需要尽可能保证被取消订阅时要取消还未执行的任务;- 取消订阅一个
Worker
不能影响同一个Scheduler
的其他Worker
;
Observer
的方法被串行调用是一样的。 除了上面必须的要求,下面几点特性如果能具备也是非常好的:
- 一个被
Worker
调度的任务最好不要切换线程执行(hopping threads),保证在一个任务只在一个线程内执行能提升性能(避免线程切换的开销)。 - 串行发起的延迟任务,如果延迟时间相同,最好也能按照 FIFO 的顺序执行,并发调度的任务不做此要求。
Scheduler
实现最好用单线程的线程池来支持每个Worker
,而这也正是标准Scheduler
的实现方案:底层的ScheduledExecutorService
保证了上面的特性。SubscribeOn
subscribeOn()
的目的就是,确保调用subscribe()
函数的副作用代码(执行额外的代码)在另外的(参数指定的)线程中执行。然而首先几乎没有官方的 RxJava 代码会在自己的线程执行这些副作用代码;其次你也可以在自定义的Observable
中执行副作用代码,无论是通过create()
函数来实现,还是通过SyncOnSubscribe
和fromCallable()
API 来实现。 那我们为什么需要把副作用代码移到其他的线程中执行呢?主要的使用场景是在当前线程进行网络请求、数据库访问或者其他任何涉及阻塞的操作。让一个Tomcat的工作线程阻塞住并不是什么大问题(当然我们也可以通过响应式方式改进这种情况),但在 Swing 应用中阻塞了事件分发线程(Event Dispatch Thread,EDT),或者在Android中阻塞了主线程,就会对用户体验造成不利的影响了。
因此,如果源头会在被订阅时立即执行一些操作,我们希望这些操作在其他的线程执行。通常我们可以把对
subscribe()
的调用以及后续整个过程的所有操作都提交到一个ExecutorService
上,但这时我们会面临取消订阅和Subscriber
分离的问题。随着越来越多(越来越复杂)的操作需要异步地取消订阅,用这样的方式处理所有的情况将会变得很不方便。 幸运的是,我们可以把这一逻辑抽象成一个操作符:
subscribeOn()
。 关于
subscribeOn
最常见的两个问题就是:如果使用了两次subscribeOn
(直接或者通过其他操作符间接使用两次),会发生什么?为什么第二次使用subscribeOn
无法再次修改subscribe
执行的线程? 源头被订阅之后执行的代码,将在最先使用的(代码上最靠近源头的)
subscribeOn()
操作符指定的线程上执行,而且后续的subscribeOn()
都无法改变这一结果。这就是为什么基于Rx的API不能在返回Observable
的时候提前使用subscribeOn()
或者提供指定Scheduler
选项的原因。ObserveOn
observeOn()
的目的是确保所有发出的数据/通知都在指定的线程中被接收。RxJava默认是同步的,即onXXX()
是在同一个线程中串行调用的:for (int i = 0;
i < 1000;
i++) {
MapSubscriber.onNext(i) {
FilterSubscriber.onNext(i) {
TakeSubscriber.onNext(i) {
MySubscriber.onNext(i);
}
}
}
}
在很多场景下,我们需要把
onNext()
的调用(以及其后的所有链式调用)转移到另一个线程中。例如,可能生成map()
的输入是很快的,但是map时的计算非常耗时,有可能会阻塞GUI线程。又例如,我们可能有些在后台线程中执行的任务(数据库、网络访问,或者耗时的计算),需要把结果在GUI中进行展示,很多GUI框架只允许在特定线程中修改GUI内容。 从概念上来说,
observeOn
通过调度一个任务,把源Observable
的onXXX()
调度到指定的调度器(scheduler)上。这样,下游接收(执行)onXXX()
时,就是在指定的调度器上,但接收的是同样的值:ExecutorService exec = Executors.newSingleThreadedExecutor();
IObservable observeOn = o -> {
source.subscribe(new Observer() {
@Override
public void onNext(T t) {
exec.submit(() -> o.onNext(t));
}@Override
public void onError(Throwable e) {
exec.submit(() -> o.onError(e));
}@Override
public void onCompleted() {
exec.submit(() -> o.onCompleted());
}
});
};
这种实现方式要求executor是单线程的,否则就需要保证FIFO以及不会有来自同一个源的多个任务被同时执行。
取消订阅的处理将更加复杂,因为我们必须保持所有正在执行中的任务,当它们执行结束时移除它们,以及保证每个任务都能被及时取消。
通过对比
subscribeOn
和observeOn
,我们可以发现subscribeOn
调度了整个source.subscribe(...)
的调用,而observeOn
则是调度每个单独的subscriber.onXXX()
调用。 所以你可以看到如果多次使用
observeOn
,内部被调度的任务,将会把subscriber.onNext
的执行调度到另一个调度器中:worker.schedule(() -> worker2.schedule(() -> subscriber.onNext(t)));
所以
observeOn
会重载调用链中指定的线程,因此最靠近subscriber
的observeOn
指定的线程,将作为最终onXXX()
执行的线程。从上面展开的等效代码我们可以看出,worker被浪费了,因为多余的调度并没有任何意义。总结
subscribeOn()
是为了使源Observable订阅的代码在指定调度器(线程)上执行,多次调用subscribeOn()
时,后面的subscribeOn()
只会改变前面的subscribeOn()
调度操作所在的线程,并不能改变最终被调度的代码执行的线程,但对于中途的代码执行的线程,还是会影响到的。 observeOn()
是为了使onXXX
方法在指定调度器(线程)上执行,多次调用observeOn()
时,每次调用都会改变数据向下传递时所在的线程。References
- Piasy: RXJAVA系列翻译
- Piasy: 拆轮子系列:拆 RxJava
推荐阅读
- android第三方框架(五)ButterKnife
- Android中的AES加密-下
- 带有Hilt的Android上的依赖注入
- Java|Java OpenCV图像处理之SIFT角点检测详解
- C语言浮点函数中的modf和fmod详解
- android|android studio中ndk的使用
- Android事件传递源码分析
- RxJava|RxJava 在Android项目中的使用(一)
- Android7.0|Android7.0 第三方应用无法访问私有库
- 深入理解|深入理解 Android 9.0 Crash 机制(二)