RxJava——线程控制
前言
对于一般的需求场景,需要在子线程中实现耗时的操作;然后回到主线程实现 UI操作应用到 RxJava模型中,可理解为:
被观察者 (Observable) 在 子线程 中生产事件(如实现耗时操作等等)实现方式 采用 RxJava内置的线程调度器( Scheduler ),即通过 功能性操作符subscribeOn() & observeOn()实现
观察者(Observer)在 主线程 接收 & 响应事件(即实现UI操作
subscribeOn
使用该方法可以指定被观察者执行方法位于的线程。
【RxJava——线程控制】注意:该方法调用只能生效一次,即第一次调用后,再调用observeOnsubscribeOn
无法改变其执行线程的位置。
使用该方法指定观察者事件响应位于的线程。
注意:该方法可调用多次,每一次调用代码实现observeOn
,后续操作线程就会切换一次,这里的后续操作指的是调用observeOn
后,在下一个observeOn
前指定的事件监听操作
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Log.v("rxJava","Observable thread :" + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io()) // 指定被观察者执行线程
.observeOn(Schedulers.newThread()) // 切换到新线程
.doOnNext(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("rxJava","do on next thread :" + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.newThread()) // 切换到另一个新线程
.filter(new Predicate() {
@Override
public boolean test(Integer integer) throws Exception {
Log.v("rxJava","filter thread:"+ Thread.currentThread().getName());
return true;
}
})
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.v("rxJava","Observer onSubscribe thread :" + Thread.currentThread().getName());
}@Override
public void onNext(Integer value) {
Log.v("rxJava","Observer onNext thread :" + Thread.currentThread().getName());
}@Override
public void onError(Throwable e) {
Log.v("rxJava","Observer onError thread :" + Thread.currentThread().getName());
}@Override
public void onComplete() {
Log.v("rxJava","Observer onComplete thread :" + Thread.currentThread().getName());
}
});
运行结果
V/rxJava: Observer onSubscribe thread :main注意:onSubscribe方法总是执行在调用subscribe方法的线程
V/rxJava: Observable thread :RxCachedThreadScheduler-1
V/rxJava: do on next thread :RxNewThreadScheduler-1
V/rxJava: do on next thread :RxNewThreadScheduler-1
V/rxJava: do on next thread :RxNewThreadScheduler-1
V/rxJava: filter thread:RxNewThreadScheduler-2
V/rxJava: Observer onNext thread :RxNewThreadScheduler-2
V/rxJava: filter thread:RxNewThreadScheduler-2
V/rxJava: Observer onNext thread :RxNewThreadScheduler-2
V/rxJava: filter thread:RxNewThreadScheduler-2
V/rxJava: Observer onNext thread :RxNewThreadScheduler-2
V/rxJava: Observer onComplete thread :RxNewThreadScheduler-2
线程可选参数 schedulers.io()
这个调度器时用于I/O操作。它基于根据需要,增长或缩减来自适应的线程池。我们将使用它来修复StrictMode检测到的违规做法。由于它专用于I/O操作,所以并不是RxJava的默认方法;正确的使用它是由开发者决定的。 重点需要注意的是线程池是无限制的,大量的I/O调度操作将创建许多个线程并占用内存。一如既往的是,我们需要在性能和简捷两者之间找到一个有效的平衡点。
Schedulers.computation()
这个是计算工作默认的调度器,它与I/O操作无关。它也是许多RxJava方法的默认调度器:buffer(),debounce(),delay(),interval(),sample(),skip()。
Schedulers.immediate()
这个调度器允许你立即在当前线程执行你指定的工作。它是timeout(),timeInterval(),以及timestamp()方法默认的调度器。
Schedulers.newThread()
指定一个新线程来执行任务,如果有多个步骤且每个步骤都使用这个方法调度,则每个步骤都是在一个新的线程中,而不是同一个线程。
Schedulers.trampoline()
当我们想在当前线程执行一个任务时,并不是立即,我们可以用.trampoline()将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。它是repeat()和retry()方法默认的调度器。
推荐阅读
- 急于表达——往往欲速则不达
- 慢慢的美丽
- 《真与假的困惑》???|《真与假的困惑》??? ——致良知是一种伟大的力量
- 2019-02-13——今天谈梦想()
- 考研英语阅读终极解决方案——阅读理解如何巧拿高分
- Ⅴ爱阅读,亲子互动——打卡第178天
- 低头思故乡——只是因为睡不着
- 取名——兰
- 每日一话(49)——一位清华教授在朋友圈给大学生的9条建议
- 广角叙述|广角叙述 展众生群像——试析鲁迅《示众》的展示艺术