RxJava操作符实战(1)-线程切换
前言
在没有学习RxJava时我们切换线程可以有以下几种方法:
(1) 开启一个子线程处理耗时操作,当操作处理完后可以使用handler发送消息通知主线程更新内容,或者使用 runOnUiThread。
(2)AsyncTask,在其doInBackground方法中执行耗时的操作,调用publishProgress方法通知主线程,然后在onProgressUpdate中更新进度显示,在onPostExecute中显示最终结果。
那么在学习了RxJava后,线程间的切换就变得非常简单了
实例环境
在app中展示内容时,有时需要在网上拉取数据或者从数据库中获取数据,为了不阻塞主线程则需要开启一个新的子线程,将这些处理操作放在子线程中,并在处理过程中和处理完了以后通知UI线程更新数据,这就形成了线程间的切换。现在我们就是用Rxjava进行线程间的切换。
代码如下:
Observable.create(new ObservableOnSubscribe(){
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Log.e("subscribe","当先的线程是:"+Thread.currentThread().getName());
emitter.onNext("1");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {}@Override
public void onNext(String s) {
Log.e("onNext","当先的线程是:"+Thread.currentThread().getName());
Log.e("onNext",s);
}@Override
public void onError(Throwable e) {
Log.e("onError",e.getMessage());
}@Override
public void onComplete() {
Log.e("onComplete","onComplete");
}
});
测试结果如下:
07-10 16:54:10.858 15439-15457/? E/subscribe: 当先的线程是:RxCachedThreadScheduler-1
07-10 16:54:10.879 15439-15439/? E/onNext: 当先的线程是:main
07-10 16:54:10.879 15439-15439/? E/onNext: 1
07-10 16:54:10.879 15439-15439/? E/onComplete: onComplete
需要进行说明的是需要在后台线程进行处理的放在Observable的subscribe,subscribeOn(Schedulers)决定了subscribe方法在那个线程中进行处理,需要在UI线程进行数据更新的数据可以从Observable的onNext方法中获取,当出现了错误时可以从onError中获取,处理完毕后会调用onComplete方法。而Observable所在的线程由observeOn(Schedules)决定,所以上面的代码就将数据处理放在了子线程中,而获取数据的操作放在了UI线程上。那么如果不指定Observable和Observer所在的线程呢?那么他们俩均会在当前线程中执行。测试结果如下:
07-10 16:58:05.763 16229-16229/com.zhqy.rxjavathreaddemo E/subscribe: 当先的线程是:main
07-10 16:58:05.763 16229-16229/com.zhqy.rxjavathreaddemo E/onNext: 当先的线程是:main
07-10 16:58:05.763 16229-16229/com.zhqy.rxjavathreaddemo E/onNext: 1
07-10 16:58:05.763 16229-16229/com.zhqy.rxjavathreaddemo E/onComplete: onComplete
可以看到当不指定Observable和Observer的线程时,他们俩均运行在UI线程中。
那么都有哪些线程可以指定呢?Schedules里为我们提供了一些线程:
-
Schedulers.computation()
:用于计算任务,默认线程数等于处理器的数量。 -
Schedulers.from(Executor executor)
:使用Executor
作为调度器,关于Executor
框架可以参考这篇文章:多线程知识梳理(5) - 线程池四部曲之 Executor 框架。*Schedulers.io(?)
:用于IO
密集型任务,例如访问网络、数据库操作等,也是我们最常使用的。 -
Schedulers.newThread(?)
:为每一个任务创建一个新的线程。 -
Schedulers.trampoline(?)
:当其它排队的任务完成后,在当前线程排队开始执行。*Schedulers.single()
:所有任务共用一个后台线程。
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
AndroidSchedulers.mainThread():运行在应用程序的主线程。
AndroidSchedulers.from(Looper looper):运行在该looper对应的线程当中。
CompositeDisposable 对下游进行管理 如果Activity要被销毁时,我们的后台任务没有执行完,那么就会导致Activity不能正常回收,而对于每一个Observer,都会有一个Disposable对象用于管理,而RxJava提供了一个CompositeDisposable类用于管理这些Disposable,我们只需要将其将入到该集合当中,在Activity的onDestroy方法中,调用它的clear方法,就能避免内存泄漏的发生。Disposable可以切断Observable和Observer之间的联系。代码如下:
Observable observable = Observable.just(1, 2, 3, 4, 5)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Observer observer = new Observer() {
Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable=d;
}@Override
public void onNext(Integer integer) {
if (integer.equals(3)){
disposable.dispose();
}
Log.e("onNext", integer + "");
}@Override
public void onError(Throwable e) {}@Override
public void onComplete() {}
};
observable.subscribe(observer);
测试结果如下:
07-10 17:12:22.990 17585-17585/? E/onNext: 1
07-10 17:12:22.990 17585-17585/? E/onNext: 2
07-10 17:12:22.990 17585-17585/? E/onNext: 3
【RxJava操作符实战(1)-线程切换】从结果可以看出,当发射3,切断Observable和Observer以后observer便无法接收到observable发送的4,5了。
推荐阅读
- Shell-Bash变量与运算符
- 2.6|2.6 Photoshop操作步骤的撤消和重做 [Ps教程]
- MongoDB,Wondows下免安装版|MongoDB,Wondows下免安装版 (简化版操作)
- 一起来学习C语言的字符串转换函数
- C语言字符函数中的isalnum()和iscntrl()你都知道吗
- 在线版的迅捷思维导图怎么操作()
- 操作系统|[译]从内部了解现代浏览器(1)
- RxJava|RxJava 在Android项目中的使用(一)
- 字符串拼接成段落,换行符(\n)如何只执行n-1次
- 爬虫数据处理HTML转义字符