Kotlin|RxKotlin使用介绍

1. RxKotlin是什么? 虽然ReactiveX在各个平台都有实现,例如Java上RxJava,Javascript上的RxJs。但是RxKotlin并非Kotlin上的实现,RxKotlin仅仅是一个适用于Kotlin的RxJava的扩展库。


2. 导入 for maven

io.reactivex.rxjava2 rxkotlin 2.3.0

for gradle
implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'

RxKotlin的版本号和RxJava的一致,如果RxJava1.x就用对应的RxKotlin1.x

3. 创建 3.1 创建Observable、Flowable RxKotlin提供了toObservabletoFlowable等面向Collections的扩展方法,可以快速创建Observable或者Flowable
val observable = listOf(1, 1, 2, 3).toObservable() observable.test().assertValues(1, 1, 2, 3)

val flowable = listOf(1, 1, 2, 3).toFlowable() flowable.buffer(2).test().assertValues(listOf(1, 1), listOf(2, 3))

3.2 创建Completable 通过toCompletable,我们将一个Actions、Callables、 Futures或则一个无参的lambda转成一个Completable:
var value = https://www.it610.com/article/0 val completable = { value = 3 }.toCompletable() assertFalse(completable.test().isCancelled()) assertEquals(3, value)


4. Map化 4.1 转Map toMap将一个Pair的Observable转成一个map的Observable,无需再通过RxJava的map操作符
val list = listOf(Pair("a", 1), Pair("b", 2), Pair("c", 3), Pair("a", 4)) val observable = list.toObservable() val map = observable.toMap() assertEquals(mapOf(Pair("a", 4), Pair("b", 2), Pair("c", 3)), map.blockingGet())

4.2 转Mutimap 上面例子中,同样的key("a")会被后面的值覆盖,如果想保留所有的value,可以使用toMultimap
val list = listOf(Pair("a", 1), Pair("b", 2), Pair("c", 3), Pair("a", 4)) val observable = list.toObservable() val map = observable.toMultimap() assertEquals( mapOf(Pair("a", listOf(1, 4)), Pair("b", listOf(2)), Pair("c", listOf(3))), map.blockingGet())


5. 合并 5.1 合并Observables Subject的扩展方法mergeAll可以将多个Observable合并为一个无序的Observable
val subject = PublishSubject.create>() val observable = subject.mergeAll()

相当于
val observable = subject.flatMap { it }

另外,还提供了concatAll(相当于concatMap ,合并成有序的流),以及switchLatest (相当于switchMap ,取消前一个Observable的发射)
5.2 合并Completable、Maybe、Single 针对Completable、Maybe、Single等类型,分别提供了对应的合并操作符:
例如mergeAllMaybes
val subject = PublishSubject.create>() val observable = subject.mergeAllMaybes() subject.onNext(Maybe.just(1)) subject.onNext(Maybe.just(2)) subject.onNext(Maybe.empty()) subject.onNext(Maybe.error(Exception("error"))) subject.onNext(Maybe.just(3)) observable.test().assertValues(1, 2).assertError(Exception::class.java)

5.3 合并Iterable merge() 以及 mergeDelayError() 可以将一个集合中的多个Observable合并为一个
val observables = mutableListOf(Observable.just("first", "second")) val observable = observables.merge() observables.add(Observable.just("third", "fourth")) observable.test().assertValues("first", "second", "third", "fourth")

merge遇到错误就立即停止
// ... observables.add(Observable.error(Exception("e"))) observables.add(Observable.just("fifth")) // ... observable.test().assertValues("first", "second", "third", "fourth")

mergeDelayError会跳过error处理到最后
// ... observables.add(Observable.error(Exception("e"))) observables.add(Observable.just("fifth")) // ... observable.test().assertValues("first", "second", "third", "fourth", "fifth")


6. 类型转换 cast 用来进行类型强转:
val observable = Observable.just(1, 1, 2, 3) observable.cast().test().assertValues(1, 1, 2, 3)

ofType的作用是客户根据类型进行过滤
val observable = Observable.just(1, "and", 2, "and") observable.ofType().test().assertValues(1, 2)


7. 语法优化 RxKotlin中还有很多优化虽然没有创建新的扩展操作符,但是基于Kotlin的语法优势在写法上进行了优化:
比如subscribeBy 基于Kotlin的命名可选参数机制,可以替代subscribe的使用
Observable.just(1).subscribeBy(onNext = { println(it) }

再比如,RxJava中的很多操作符必须使用RxJava内置的接口,现在可以用Kotlin的lambda替换这些接口,如Observable.zip,传统的写法
Observable.zip(Observable.just(1), Observable.just(2), BiFunction { a, b -> a + b })

【Kotlin|RxKotlin使用介绍】RxKotlin提供了与之对应的Observables.zip,无需借助BiFunction接口写法上更加简单:
Observables.zip(Observable.just(1), Observable.just(2)) { a, b -> a + b }

    推荐阅读