Android|RxJava2详解(二)--操作符
操作符简介
Observable和Observer只是ReactiveX的开始,他们自己只不过是标准观察者模式的轻微扩展,更适合处理事件序列而不是单个回调。
ReactiveX真正强大的是那些让你可以随意变换、组合、操作Observable发射的数据序列的操作符(Operators),这些操作符可以让你声明式地组合异步序列,同时具备回调的所有效率优势,但没有传统异步系统的嵌套回调处理的缺点。
操作符分类
【Android|RxJava2详解(二)--操作符】ReactiveX为了能够更好进行异步处理操作,定义了非常多的操作符,每个平台实现可以根据需要实现,也可以自定义更多的操作符:
- 创建Observable(Creating Observables)
-
Create
,Defer
,Empty
/Never
/Throw
,From
,Interval
,Just
,Range
,Repeat
,Start
, andTimer
- 变换Observable的Item(Transforming Observable Items)
-
Buffer
,FlatMap
,GroupBy
,Map
,Scan
, andWindow
- 过滤Observable(Filtering Observables)
-
Debounce
,Distinct
,ElementAt
,Filter
,First
,IgnoreElements
,Last
,Sample
,Skip
,SkipLast
,Take
, andTakeLast
- 组合多个Observable(Combining Observables)
-
And
/Then
/When
,CombineLatest
,Join
,Merge
,StartWith
,Switch
, andZip
- 错误处理(Error Handling Operators)
-
Catch
andRetry
- Observable工具(Utility Operators)
-
Delay
,Do
,Materialize
/Dematerialize
,ObserveOn
,Serialize
,Subscribe
,SubscribeOn
,TimeInterval
,Timeout
,Timestamp
, andUsing
- 条件及布尔判断(Conditional and Boolean Operators)
-
All
,Amb
,Contains
,DefaultIfEmpty
,SequenceEqual
,SkipUntil
,SkipWhile
,TakeUntil
, andTakeWhile
- 数学及集合操作符(Mathematical and Aggregate Operators)
-
Average
,Concat
,Count
,Max
,Min
,Reduce
, andSum
- 转换Observable(Converting Observables)
-
To
- Connectable Observable操作符(Connectable Observable Operators)
-
Connect
,Publish
,RefCount
, andReplay
- 背压操作符(Backpressure Operators)
- 一些可以进行事件/数据流控制的操作符
很多操作符都作用于Observable并返回一个Observable,这就意味着你可以一个接一个的链式使用这些操作符,链中的每个操作符都会修改之前操作符操作产生的Observable。
其它的链式调用模式,像Builder模式,也可以连续的调用一系列操作方法。Builder模式一般都是链式地修改同一个实例的属性,所以操作方法的调用顺序一般并没有什么影响,但是Observable操作符的使用顺序却很重要,因为每个操作符操作Observable后都会马上将新生成的Observable交给下一个操作符去处理。
一些“核心”的操作符
Create 创建Observable:
文章图片
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext("one");
e.onNext("two");
e.onNext("three");
e.onComplete();
}
}).subscribe(new Consumer() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("accept: " + s);
}
});
Defer Defer操作符会等到有一个Observer订阅才会生成一个Observable,也就是说每个订阅者都有自己序列,这可以确保最后一刻(订阅时)创建的Observable包含最新的数据。
文章图片
Observable defer = Observable.defer(new Callable call() throws Excep
Object o = new Object();
System.out.println("emit: " + "object" + o.hashCode());
return Observable.just("object" + o.hashCode());
}
});
Consumer consumer0 = new Consumer() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("accept: " + s);
}
};
Consumer consumer1 = new Consumer() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("accept: " + s);
}
};
defer.subscribe(consumer0);
defer.subscribe(consumer1);
emit: object124101560
accept: object124101560
emit: object913896849
accept: object913896849
From 把其他对象或数据结构转成Observable
文章图片
RxJava2的实现为
fromArray
、fromCallable
、fromFuture
、fromIterable
、fromPublisher
等方法。Just 把一个item转换成发射这个item的Observable。Just和From类似,From会把数组或iterable或其它有序东西内部的所有item取出来发射,而Just只会简单地将数组或iterable或者其它原来的东西不做任何更改地作为一个item发射。
文章图片
Interval 创建一个每隔给定的时间间隔发射一个递增整数的Observable
文章图片
Timer 创建一个给定延迟后发射一个item(0L)的Observable
文章图片
Range 创建发射给定范围的连续整数的Observable
文章图片
Map 通过给每个item应用函数来转换要发射的item,Map操作符将返回发射函数应用结果item的新的Observable
文章图片
FlatMap FlatMap操作符会应用你指定的函数到每个源Observable要发射的item,该函数会返回一个自己发射item的Observable,然后FlatMap会merge这些新的Observable,把merge后的item序列作为新Observable的发射序列。由于是merge操作所以item发射顺序可能是交错的,如果想保证严格的发射顺序可以使用ConcatMap操作符。
文章图片
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function() {
@Override
public ObservableSource apply(@NonNull Integer integer) throws Exception {
final List list = new ArrayList<>();
for (int i = 0;
i < 3;
i++) {
list.add("I am value " + integer + "-" + i);
}
return Observable.fromIterable(list).delay(50, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept: " + s);
}
});
accept: I am value 1-1
accept: I am value 2-1
accept: I am value 2-2
accept: I am value 1-2
accept: I am value 3-1
accept: I am value 3-2
Filter 只发射Observable中那些通过判定测试(predicate test)的item
文章图片
Take 只发射Observable前n个item
文章图片
Merge 把多个Observable merge为一个Observable,merge发射的item可能是交错的,且如果任何源Observable出现
onError
都会马上终止merge过程并传给最终Observable。如果想延迟onError
到merge结束可以使用MergeDelayError操作符。 文章图片
Zip 根据你给定的方法把多个Observable发射的item结合在一起,每组item结合后都作为单个item(给定方法的返回值)发射。Zip操作符严格按序应用给定的方法,所以新生成的Observable的第一个item肯定是Observable #1第一个item和Observable #2第一个item的结合(即方法返回值),新生成的Observable的第二个item肯定是Observable #1第二个item和Observable #2第二个item的结合,以此类推,只发射与 发射最少item的源Observable的发射item数 一样多的item
Delay 返回一个每发射一个源Observable的item之前都延迟给定时间的Observable,但
onError
不会被延迟。 SubscribeOn 指定Observable要操作在哪个Scheduler上。
文章图片
ObserveOn 指定Observer将要在哪个Scheduler上订阅它的Observable。
文章图片
Subscribe 把Observable和Observer连接起来,只有通过Subscribe操作符订阅Observable才能收到Observable发射的item以及
onError
、onComplete
信号。All 判断Observable发射的所有的item是否都满足指定条件。当且仅当源Observable正常终止且每个发射的item都被给定的判定函数判断为
true
时,All操作符才会返回一个只发射一个true
的Observable。如果源Observable发射的任何一个item被给定的判定函数判断为false,All操作符会返回一个只发射一个false
的Observable。 文章图片
Amb Ambiguous(模棱两可的)的缩写。对于给定的两个或多个源Observable,只发射 第一个发射item或通知(
onError
或onCompleted
)的那个Observable 的所有item及通知,Amb会忽略并丢弃其它源Observable发射的item及通知。 文章图片
Contains 判断Observable是否发射了指定的item,如果源Observable发射了指定的item就返回一个发射
true
的Observable,如果源Observable直到结束都没发射指定的item就返回一个发射false
的Observable。类似的IsEmpty操作符会在源Observable直到终止都没发射任何item时返回一个发射true
的Observable。 文章图片
SkipUntil 在第二个Observable发射一个item之前丢弃源Observable要发射的item,之后会镜像发射源Observable的item。
文章图片
SkipWhile 在你给定的条件变成false之前丢弃源Observable要发射的item,之后会镜像发射源Observable的item。
文章图片
TakeUntil TakeUtil会镜像源Observable并监视你给定的第二个Observable,在第二个Observable发射一个item或终止信号(
onError
或onCompleted
)后丢弃源Observable的任何item(即停止镜像源Observable并终止)。 文章图片
Concat 简单地将多个Observable连接(无交错的)成一个Observable,即只有第一个Observable的item都被发射完才会发射第二个Observable的item,以此类推。由于Concat会等待订阅 给定的多个Observable 直到之前的Observable完成,如果你想连接一个"热Observable"(在被订阅之前就立即发射item的Observable),Concat将看不到这些也就不会发射任何item。
文章图片
References
- ReactiveX.io
推荐阅读
- android第三方框架(五)ButterKnife
- Android中的AES加密-下
- 带有Hilt的Android上的依赖注入
- Java|Java OpenCV图像处理之SIFT角点检测详解
- C语言浮点函数中的modf和fmod详解
- android|android studio中ndk的使用
- Android事件传递源码分析
- RxJava|RxJava 在Android项目中的使用(一)
- Android7.0|Android7.0 第三方应用无法访问私有库
- 深入理解|深入理解 Android 9.0 Crash 机制(二)