RxJava操作符实战(1)-线程切换

前言 在没有学习RxJava时我们切换线程可以有以下几种方法:
(1) 开启一个子线程处理耗时操作,当操作处理完后可以使用handler发送消息通知主线程更新内容,或者使用 runOnUiThread。
(2)AsyncTask,在其doInBackground方法中执行耗时的操作,调用publishProgress方法通知主线程,然后在onProgressUpdate中更新进度显示,在onPostExecute中显示最终结果。
那么在学习了RxJava后,线程间的切换就变得非常简单了
实例环境 在app中展示内容时,有时需要在网上拉取数据或者从数据库中获取数据,为了不阻塞主线程则需要开启一个新的子线程,将这些处理操作放在子线程中,并在处理过程中和处理完了以后通知UI线程更新数据,这就形成了线程间的切换。现在我们就是用Rxjava进行线程间的切换。
代码如下:

Observable.create(new ObservableOnSubscribe(){ @Override public void subscribe(ObservableEmitter emitter) throws Exception { Log.e("subscribe","当先的线程是:"+Thread.currentThread().getName()); emitter.onNext("1"); emitter.onComplete(); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer() { @Override public void onSubscribe(Disposable d) {}@Override public void onNext(String s) { Log.e("onNext","当先的线程是:"+Thread.currentThread().getName()); Log.e("onNext",s); }@Override public void onError(Throwable e) { Log.e("onError",e.getMessage()); }@Override public void onComplete() { Log.e("onComplete","onComplete"); } });

测试结果如下:
07-10 16:54:10.858 15439-15457/? E/subscribe: 当先的线程是:RxCachedThreadScheduler-1 07-10 16:54:10.879 15439-15439/? E/onNext: 当先的线程是:main 07-10 16:54:10.879 15439-15439/? E/onNext: 1 07-10 16:54:10.879 15439-15439/? E/onComplete: onComplete

需要进行说明的是需要在后台线程进行处理的放在Observable的subscribe,subscribeOn(Schedulers)决定了subscribe方法在那个线程中进行处理,需要在UI线程进行数据更新的数据可以从Observable的onNext方法中获取,当出现了错误时可以从onError中获取,处理完毕后会调用onComplete方法。而Observable所在的线程由observeOn(Schedules)决定,所以上面的代码就将数据处理放在了子线程中,而获取数据的操作放在了UI线程上。那么如果不指定Observable和Observer所在的线程呢?那么他们俩均会在当前线程中执行。测试结果如下:
07-10 16:58:05.763 16229-16229/com.zhqy.rxjavathreaddemo E/subscribe: 当先的线程是:main 07-10 16:58:05.763 16229-16229/com.zhqy.rxjavathreaddemo E/onNext: 当先的线程是:main 07-10 16:58:05.763 16229-16229/com.zhqy.rxjavathreaddemo E/onNext: 1 07-10 16:58:05.763 16229-16229/com.zhqy.rxjavathreaddemo E/onComplete: onComplete

可以看到当不指定Observable和Observer的线程时,他们俩均运行在UI线程中。
那么都有哪些线程可以指定呢?Schedules里为我们提供了一些线程:
  • Schedulers.computation():用于计算任务,默认线程数等于处理器的数量。
  • Schedulers.from(Executor executor):使用Executor作为调度器,关于Executor框架可以参考这篇文章:多线程知识梳理(5) - 线程池四部曲之 Executor 框架。* Schedulers.io(?):用于IO密集型任务,例如访问网络、数据库操作等,也是我们最常使用的。
  • Schedulers.newThread(?):为每一个任务创建一个新的线程。
  • Schedulers.trampoline(?):当其它排队的任务完成后,在当前线程排队开始执行。* Schedulers.single():所有任务共用一个后台线程。
这就是RxJava中为我们提供的线程,但如果你是Android开发人员,你还可以使用AndroidSchedulers所提供的线程。
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

AndroidSchedulers.mainThread():运行在应用程序的主线程。
AndroidSchedulers.from(Looper looper):运行在该looper对应的线程当中。
CompositeDisposable 对下游进行管理 如果Activity要被销毁时,我们的后台任务没有执行完,那么就会导致Activity不能正常回收,而对于每一个Observer,都会有一个Disposable对象用于管理,而RxJava提供了一个CompositeDisposable类用于管理这些Disposable,我们只需要将其将入到该集合当中,在Activity的onDestroy方法中,调用它的clear方法,就能避免内存泄漏的发生。Disposable可以切断Observable和Observer之间的联系。代码如下:
Observable observable = Observable.just(1, 2, 3, 4, 5) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); Observer observer = new Observer() { Disposable disposable; @Override public void onSubscribe(Disposable d) { disposable=d; }@Override public void onNext(Integer integer) { if (integer.equals(3)){ disposable.dispose(); } Log.e("onNext", integer + ""); }@Override public void onError(Throwable e) {}@Override public void onComplete() {} }; observable.subscribe(observer);

测试结果如下:
07-10 17:12:22.990 17585-17585/? E/onNext: 1 07-10 17:12:22.990 17585-17585/? E/onNext: 2 07-10 17:12:22.990 17585-17585/? E/onNext: 3

【RxJava操作符实战(1)-线程切换】从结果可以看出,当发射3,切断Observable和Observer以后observer便无法接收到observable发送的4,5了。

    推荐阅读