【Android(RxJava的使用)】 说明:
RxJava用于异步执行任务,跟创建子线程执行任务无本质区别,优点在于让代码看起来整洁优雅些,并不能减少代码量
一、加入jar包依赖(app下的build.gradle):
dependencies {
...
compile 'io.reactivex.rxjava2:rxjava:2.+'
compile 'io.reactivex.rxjava2:rxandroid:2.+'
}
二、创建Observable(发送数据):
1.通过create方法创建(需在subscribe方法中手动调用各个监听方法,<>可以为任意类型)
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
/*
需要手动调用各个监听方法
*/
e.onNext(T);
e.onComplete();
}
});
2.通过just方法创建(自动按顺序调用各个监听方法,<>可以为任意类型)
Observable observable = Observable.just(T);
3.通过fromIterable方法创建(根据传入的列表多次调用onNext方法,<>可以为任意类型)
List list = new ArrayList<>();
list.add(T);
//onNext方法会调用size次
Observable observable = Observable.fromIterable(list);
4.通过defer方法创建(延时调用,<>可以为任意类型)
Observable observable = Observable.defer(new Callable>() {
@Override
public ObservableSource extends T> call() throws Exception {
return Observable.just(T);
}
});
5.通过interval方法创建(定时执行onNext方法,只能
Observable observable = Observable.interval(5, TimeUnit.SECONDS);
//定时5秒执行onNext,第1个参数为数值,第2个参数为单位,这里为秒
6.通过range方法创建(执行指定次数的onNext方法,只能
Observable observable = Observable.range(开始值, 次数);
7.通过rangeLong方法创建(执行指定次数的onNext方法,只能
Observable observable = Observable.rangeLong(开始值, 次数);
8.通过timer方法创建(延时指定时间执行一次onNext方法,只能
Observable observable = Observable.timer(5, TimeUnit.SECONDS);
//延时5秒执行一次onNext,第1个参数为延时时间,第2个参数为单位,这里为秒
9.通过repeat方法创建(重复执行onNext方法,<>可以为任意类型)
Observable observable = Observable.just(T).repeat();
三、创建监听(监听,接收数据):
1.Observer方式(执行订阅事件后,无错误时,依次执行onSubscribe、onNext、onComplete):
Observer observer = new Observer() {
/**
*顺序:1
**/
@Override
public void onSubscribe(Disposable d) {
}/**
*顺序:2
**/
@Override
public void onNext(T arg) {
}/**
*错误回调
**/
@Override
public void onError(Throwable e) {
}/**
*顺序:3
**/
@Override
public void onComplete() {
}
};
2.Consumer方式(执行订阅事件后,accept方法会被执行):
Consumer consumer = new Consumer() {
@Override
public void accept(T arg) throws Exception {
}
};
四、执行订阅事件(执行发送数据动作):
1.执行事件:
observable.subscribe(observer);
2.取消事件:
Observable
(1)获得Disposable对象有2种方式:
方式1:
Disposable disposable = observable.subscribe(new Consumer() {
@Override
public void accept(String s) throws Exception {
}
});
方式2:
Disposable disposable = null;
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
...
}
(2)取消事件
disposable.dispose();
五、操作符的使用:
1.map方法(类型转换,将原数据类型转换为目标类型):
(1)创建Observable时调用map转换数据:
//A和B是2个不同的类
Observable observable = Observable.just(new A()).map(new Function() {
@Override
public B apply(A a) throws Exception {
//数据转换逻辑
B b = ...;
return b;
}
});
(2)在监听中接收的是转换后的数据类型:
Observer observer = new Observer() {
...
@Override
public void onNext(B b) {
}
};
observable.subscribe(observer);
2.flatMap方法(与fromIterable配合使用,将列表转换为单个实例,调用size次onNext方法):
List list = ...;
Observable observable = Observable.just(list).flatMap(new Function, ObservableSource>() {
@Override
public ObservableSource apply(List list) throws Exception {
return Observable.fromIterable(list);
//onNext方法会调用size次,每次收到的实参都是T的单个对象
}
});
Observer observer = ...;
observable.subscribe(observer);
3.filter方法(过滤器,test中判断条件,返回true才执行onNext):
Observable observable = ...;
observable.filter(new Predicate() {
@Override
public boolean test(T t) throws Exception {
if (条件) {
return true;
//返回true执行下一步,调用onNext
}
return false;
//返回false不会执行onNext
}
});
Observer observer = ...;
observable.subscribe(observer);
4.take方法(指定onNext方法执行次数):
Observable observable = ...;
observable.take(1).subscribe(...);
5.doOnNext方法(会在onNext方法之前执行)
Observable observable = ...;
observable.doOnNext(new Consumer() {
@Override
public void accept(T t) throws Exception {//此方法会在onNext之前执行
}
}).subscribe(...);
六、线程调度:
1.说明:
subscribeOn:设置Observable中任务的线程是哪种方式
observeOn:设置Observer中任务的运行在哪个线程
Schedulers.newThread():启动一个新的线程
Schedulers.io():内部使用了无上限的线程池
Schedulers.computation():默认调度器,内部使用了固定的线程池
Schedulers.single():单线程
Schedulers.trampoline():按顺序执行队列中的任务
2.使用:
Observable observable = ...;
observable.subscribeOn(Schedulers.io())//控制Observable中subscribe方法在子线程中执行
.observeOn(AndroidSchedulers.mainThread())//控制Observer中的onNext、onError、onComplete方法在主线程(UI线程)执行
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer arg) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
七、Backpressure策略:
1.Backpressure有以下几种模式(默认只能存128个事件的缓存池):
ERROR:缓存池溢出时,抛出MissingBackpressureException异常
BUFFER:设置更大的缓存池
DROP:超上限时丢弃掉
LATEST:同DROP,区别是最后一个事件能收到
2.使用:
(1)使用Flowable替换Observable
Flowable flowable = Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter fe) throws Exception {
fe.onNext(t);
fe.onComplete();
}
}, BackpressureStrategy.ERROR);
//此参数设置Backpressure策略
(2)使用Subscriber替换Observer,并在onSubscribe中调用Subscription.request()申请事件数量
Subscriber subscriber = new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
//表示向生产者申请可以消费的事件数量
}
...
};
(3)执行订阅事件
flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);