RxJava2 只看这一篇文章就够了

0. 简介 RxJava 其实就是提供一套异步编程的 API,这套 API 是基于观察者模式的,而且是链式调用的,所以使用 RxJava 编写的代码的逻辑会非常简洁。
RxJava 有以下三个基本的元素:

  • 被观察者(Observable)
  • 观察者(Observer)
  • 订阅(subscribe)
下面来说说以上三者是如何协作的:
首先在 gradle 文件中添加依赖:
implementation 'io.reactivex.rxjava2:rxjava:2.1.4' implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'

创建被观察者:
Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName()); e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } });

创建观察者:
Observer observer = new Observer() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "======================onSubscribe"); }@Override public void onNext(Integer integer) { Log.d(TAG, "======================onNext " + integer); }@Override public void onError(Throwable e) { Log.d(TAG, "======================onError"); }@Override public void onComplete() { Log.d(TAG, "======================onComplete"); } }; observable.subscribe(observer); 复制代码这里其实也可以使用链式调用: Observable.create(new ObservableOnSubscribe < Integer > () { @Override public void subscribe(ObservableEmitter < Integer > e) throws Exception { Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName()); e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }) .subscribe(new Observer < Integer > () { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "======================onSubscribe"); }@Override public void onNext(Integer integer) { Log.d(TAG, "======================onNext " + integer); }@Override public void onError(Throwable e) { Log.d(TAG, "======================onError"); }@Override public void onComplete() { Log.d(TAG, "======================onComplete"); } }); 复制代码被观察者发送的事件有以下几种,总结如下表:事件种类 作用onNext() 发送该事件时,观察者会回调 onNext() 方法onError() 发送该事件时,观察者会回调 onError() 方法,当发送该事件之后,其他事件将不会继续发送onComplete() 发送该事件时,观察者会回调 onComplete() 方法,当发送该事件之后,其他事件将不会继续发送其实可以把 RxJava 比喻成一个做果汁,家里有很多种水果(要发送的原始数据),你想榨点水果汁喝一下,这时候你就要想究竟要喝什么水果汁呢?如果你想喝牛油果雪梨柠檬汁,那你就要把这三种水果混在一起榨汁(使用各种操作符变换你想发送给观察者的数据),榨完后,你就可以喝上你想要的果汁了(把处理好的数据发送给观察者)。 总结如下图:![图片](https://p1-jj.byteimg.com/tos-cn-i-t2oaga2asx/gold-user-assets/2018/5/26/1639a8ee56b13c41~tplv-t2oaga2asx-zoom-in-crop-mark:3024:0:0:0.awebp)下面就来讲解 RxJava 各种常见的操作符。 1. 创建操作符 以下就是讲解创建被观察者的各种操作符。 1.1 create() 方法预览: public static Observable create(ObservableOnSubscribe source) 复制代码有什么用: 创建一个被观察者 怎么用: Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext("Hello Observer"); e.onComplete(); } }); 复制代码上面的代码非常简单,创建 ObservableOnSubscribe 并重写其 subscribe 方法,就可以通过 ObservableEmitter 发射器向观察者发送事件。 以下创建一个观察者,来验证这个被观察者是否成功创建。 Observer observer = new Observer() { @Override public void onSubscribe(Disposable d) {}@Override public void onNext(String s) { Log.d("chan","=============onNext " + s); }@Override public void onError(Throwable e) {}@Override public void onComplete() { Log.d("chan","=============onComplete "); } }; observable.subscribe(observer); 复制代码打印结果: 05-20 16:16:50.654 22935-22935/com.example.louder.rxjavademo D/chan: =============onNext Hello Observer =============onComplete 复制代码1.2 just() 方法预览: public static Observable just(T item) ...... public static Observable just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10) 复制代码有什么用? 创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。 怎么用? Observable.just(1, 2, 3) .subscribe(new Observer < Integer > () { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "=================onSubscribe"); }@Override public void onNext(Integer integer) { Log.d(TAG, "=================onNext " + integer); }@Override public void onError(Throwable e) { Log.d(TAG, "=================onError "); }@Override public void onComplete() { Log.d(TAG, "=================onComplete "); } }); 复制代码上面的代码直接使用链式调用,代码也非常简单,这里就不细说了,看看打印结果: 05-20 16:27:26.938 23281-23281/? D/chan: =================onSubscribe =================onNext 1 =================onNext 2 =================onNext 3 =================onComplete 复制代码1.3 From 操作符 1.3.1fromArray() 方法预览: public static Observable fromArray(T... items)作者:玉刚说链接:

有什么用? 【RxJava2 只看这一篇文章就够了】这个方法和 just() 类似,只不过 fromArray 可以传入多于10个的变量,并且可以传入一个数组。
怎么用?
Integer array[] = {1, 2, 3, 4}; Observable.fromArray(array) .subscribe(new Observer < Integer > () { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "=================onSubscribe"); }@Override public void onNext(Integer integer) { Log.d(TAG, "=================onNext " + integer); }@Override public void onError(Throwable e) { Log.d(TAG, "=================onError "); }@Override public void onComplete() { Log.d(TAG, "=================onComplete "); } });

复制代码代码和 just() 基本上一样,直接看打印结果:
05-20 16:35:23.797 23574-23574/com.example.louder.rxjavademo D/chan: =================onSubscribe =================onNext 1 =================onNext 2 =================onNext 3 =================onNext 4 =================onComplete

1.3.2 fromCallable() 方法预览:
public static Observable fromCallable(Callable supplier)

复制代码有什么用?
这里的 Callablejava.util.concurrent 中的 Callable,CallableRunnable 的用法基本一致,只是它会返回一个结果值,这个结果值就是发给观察者的。
怎么用?
Observable.fromCallable(new Callable < Integer > () {@Override public Integer call() throws Exception { return 1; } }) .subscribe(new Consumer < Integer > () { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "================accept " + integer); } });

05-26 13:01:43.009 6890-6890/? D/chan: ================accept 1 复制代码1.3.3 fromFuture() 方法预览: public static Observable fromFuture(Future future)

复制代码有什么用?
参数中的 Futurejava.util.concurrent 中的 Future,Future 的作用是增加了 cancel() 等方法操作 Callable,它可以通过 get() 方法来获取 Callable 返回的值。

    推荐阅读