Android(RxJava的使用)

【Android(RxJava的使用)】 说明:
RxJava用于异步执行任务,跟创建子线程执行任务无本质区别,优点在于让代码看起来整洁优雅些,并不能减少代码量


一、加入jar包依赖(app下的build.gradle):

dependencies { ... compile 'io.reactivex.rxjava2:rxjava:2.+' compile 'io.reactivex.rxjava2:rxandroid:2.+' }



二、创建Observable(发送数据):
1.通过create方法创建(需在subscribe方法中手动调用各个监听方法,<>可以为任意类型)

Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { /* 需要手动调用各个监听方法 */ e.onNext(T); e.onComplete(); } });



2.通过just方法创建(自动按顺序调用各个监听方法,<>可以为任意类型)

Observable observable = Observable.just(T);



3.通过fromIterable方法创建(根据传入的列表多次调用onNext方法,<>可以为任意类型)

List list = new ArrayList<>(); list.add(T); //onNext方法会调用size次 Observable observable = Observable.fromIterable(list);



4.通过defer方法创建(延时调用,<>可以为任意类型)

Observable observable = Observable.defer(new Callable>() { @Override public ObservableSource call() throws Exception { return Observable.just(T); } });



5.通过interval方法创建(定时执行onNext方法,只能类型,onNext实参从0开始,每次+1)

Observable observable = Observable.interval(5, TimeUnit.SECONDS); //定时5秒执行onNext,第1个参数为数值,第2个参数为单位,这里为秒


6.通过range方法创建(执行指定次数的onNext方法,只能类型,onNext实参从开始值开始,每次+1)

Observable observable = Observable.range(开始值, 次数);


7.通过rangeLong方法创建(执行指定次数的onNext方法,只能类型,onNext实参从开始值开始,每次+1)

Observable observable = Observable.rangeLong(开始值, 次数);


8.通过timer方法创建(延时指定时间执行一次onNext方法,只能类型,onNext实参为0)

Observable observable = Observable.timer(5, TimeUnit.SECONDS); //延时5秒执行一次onNext,第1个参数为延时时间,第2个参数为单位,这里为秒


9.通过repeat方法创建(重复执行onNext方法,<>可以为任意类型)

Observable observable = Observable.just(T).repeat();



三、创建监听(监听,接收数据):
1.Observer方式(执行订阅事件后,无错误时,依次执行onSubscribe、onNext、onComplete):

Observer observer = new Observer() { /** *顺序:1 **/ @Override public void onSubscribe(Disposable d) { }/** *顺序:2 **/ @Override public void onNext(T arg) { }/** *错误回调 **/ @Override public void onError(Throwable e) { }/** *顺序:3 **/ @Override public void onComplete() { } };



2.Consumer方式(执行订阅事件后,accept方法会被执行):

Consumer consumer = new Consumer() { @Override public void accept(T arg) throws Exception { } };



四、执行订阅事件(执行发送数据动作):
1.执行事件:

observable.subscribe(observer);



2.取消事件:
Observable observable = ...;
(1)获得Disposable对象有2种方式:
方式1:

Disposable disposable = observable.subscribe(new Consumer() { @Override public void accept(String s) throws Exception { } });



方式2:

Disposable disposable = null; Observer observer = new Observer() { @Override public void onSubscribe(Disposable d) { disposable = d; } ... }



(2)取消事件

disposable.dispose();




五、操作符的使用:
1.map方法(类型转换,将原数据类型转换为目标类型):
(1)创建Observable时调用map转换数据:

//A和B是2个不同的类 Observable observable = Observable.just(new A()).map(new Function() { @Override public B apply(A a) throws Exception { //数据转换逻辑 B b = ...; return b; } });



(2)在监听中接收的是转换后的数据类型:

Observer observer = new Observer() { ... @Override public void onNext(B b) { } }; observable.subscribe(observer);




2.flatMap方法(与fromIterable配合使用,将列表转换为单个实例,调用size次onNext方法):

List list = ...; Observable observable = Observable.just(list).flatMap(new Function, ObservableSource>() { @Override public ObservableSource apply(List list) throws Exception { return Observable.fromIterable(list); //onNext方法会调用size次,每次收到的实参都是T的单个对象 } }); Observer observer = ...; observable.subscribe(observer);




3.filter方法(过滤器,test中判断条件,返回true才执行onNext):

Observable observable = ...; observable.filter(new Predicate() { @Override public boolean test(T t) throws Exception { if (条件) { return true; //返回true执行下一步,调用onNext } return false; //返回false不会执行onNext } }); Observer observer = ...; observable.subscribe(observer);



4.take方法(指定onNext方法执行次数):

Observable observable = ...; observable.take(1).subscribe(...);



5.doOnNext方法(会在onNext方法之前执行)

Observable observable = ...; observable.doOnNext(new Consumer() { @Override public void accept(T t) throws Exception {//此方法会在onNext之前执行 } }).subscribe(...);



六、线程调度:
1.说明:
subscribeOn:设置Observable中任务的线程是哪种方式
observeOn:设置Observer中任务的运行在哪个线程


Schedulers.newThread():启动一个新的线程
Schedulers.io():内部使用了无上限的线程池
Schedulers.computation():默认调度器,内部使用了固定的线程池
Schedulers.single():单线程
Schedulers.trampoline():按顺序执行队列中的任务


2.使用:

Observable observable = ...; observable.subscribeOn(Schedulers.io())//控制Observable中subscribe方法在子线程中执行 .observeOn(AndroidSchedulers.mainThread())//控制Observer中的onNext、onError、onComplete方法在主线程(UI线程)执行 .subscribe(new Observer() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer arg) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });



七、Backpressure策略:
1.Backpressure有以下几种模式(默认只能存128个事件的缓存池):
ERROR:缓存池溢出时,抛出MissingBackpressureException异常
BUFFER:设置更大的缓存池
DROP:超上限时丢弃掉
LATEST:同DROP,区别是最后一个事件能收到


2.使用:
(1)使用Flowable替换Observable

Flowable flowable = Flowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter fe) throws Exception { fe.onNext(t); fe.onComplete(); } }, BackpressureStrategy.ERROR); //此参数设置Backpressure策略



(2)使用Subscriber替换Observer,并在onSubscribe中调用Subscription.request()申请事件数量

Subscriber subscriber = new Subscriber() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); //表示向生产者申请可以消费的事件数量 } ... };



(3)执行订阅事件

flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);



    推荐阅读