Kotlin|告别RxJava(Coroutine Channel替代Rx Observable/Subject)
文章图片
最近有一些文章建议大家放弃RxJava。在AAC推荐的MVVM最佳实践中重度依赖了RxJava,是不是也可以考虑去掉其中的RxJava呢?
文章图片
RxJava的问题
- 功能过剩
MVVM中使用RxJava主要用来进行异步请求以及订阅,但RxJava的能力远不止于此,他更是一个操作符众多的流式响应式框架,功能众多确不被熟知和使用,除了徒增包体积以为,还有误用操作符造成bug的风险
- Kotlin不友好
RxJava虽然可以在Kotlin中使用,但毕竟本身是用Java写的,对函数式Lambda的支持不够Kotlin,使用了很多自定义的Function类型,与Kotlin一起使用时容易混淆,RxKotlin的扩展包仅仅是增加了一些Kt扩展方法,与真正用Kotlin实现的ReactiveX框架还是有差距的。
- 生命周期控制
与LiveData相比,RxJava无法自动感知生命周期,需要手动进行dispose,容易遗忘造成泄露。当然可以通过定义一些Observable的扩展方法实现一定程度的自动化,但总体来说RxJava不是专门为GUI实现的框架,在LivecycleAware方面的建设比较欠缺
Promise
/Future
、Android的Handler
/AsyncTask
等来说已经是大大的进步了,但是随着Coroutine的推出,很多场景可以优先考虑使用Coroutine替换使用Coroutine替换 随着Coroutine的不断完善,在异步调用方面基本上可以覆盖RxJava的所有类型。
具体可以参考 基于RxJava项目的Coroutine改造 , 其中介绍了Single请求场景中的改造Coroutine改造。这里想介绍一下Observable/Subject如何用Coroutine进行改造。这种场景一般推荐使用Channel。
Channel与BroadcastChannel Channel的本质就是一个阻塞队列,Coroutine中有两种:
- Channel :1个生产端对应1个消费端
- BroadcastChannel :一个生产端对应n个消费端
// Channel Sample
val c = ArrayChannel>(2) // 缓冲区capacity=2
launch(CommonPool) {
c.consumeEach {
Log.d("KT","${it}")
}
}launch(UI) {
c.send("a")
c.send("b")
c.send("c")
}// BroadcastChannel Sample
launch(UI) {
val bc = ArrayBroadcastChannel>(10)
bc.send("A")
bc.send("B")// Consumer1
launch(newSingleThreadContext("threadA")) {
val subscription1: SubscriptionReceiveChannel> = bc.openSubscription()
subscription1.consumeEach {
Log.d("KT-BROADCAST-1","${it}")
}
// 执行不到
}// Consumer2
launch(newSingleThreadContext("threadB")) {
val subscription2: SubscriptionReceiveChannel> = bc.openSubscription()
subscription2.consumeEach {
Log.d("KT-BROADCAST-2","${it}")
}
// 执行不到
}delay(2000)
bc.send("C")
bc.send("D")
}
输出结果
KT a
b
cKT-BROADCAST-1C
KT-BROADCAST-2C
D
KT-BROADCAST-1D
openSubscription
创建多个Subscription实例实现分别订阅。Channel的数据必须在订阅之后发送才能让消费端收到,反之则收不到数据send()与offer() send与offer都是向Channel添加数据
- send()
只能在协程中使用(launch{… }),超出缓冲数量依然可以继续添加 - offer()
可以在协程外调用,超出缓冲区数量时返回false,无法继续添加
launch(UI) {
val bc = ConflatedBroadcastChannel>()
bc.send("A")
bc.send("B")// Consumer1
launch(newSingleThreadContext("threadA")) {
val subscription1: SubscriptionReceiveChannel> = bc.openSubscription()
subscription1.consumeEach {
Log.d("KT-BROADCAST-1","${it}")
}
}// Consumer2
launch(newSingleThreadContext("threadB")) {
val subscription2: SubscriptionReceiveChannel> = bc.openSubscription()
subscription2.consumeEach {
Log.d("KT-BROADCAST-2","${it}")
}
}delay(2000)
bc.send("C")
bc.send("D")Log.d("KT-BROADCAST-LATEST","${bc.valueOrNull}")
}
KT-BROADCAST-1B
KT-BROADCAST-2B
KT-BROADCAST-1C
KT-BROADCAST-2C
D
KT-BROADCAST-LATESTD
KT-BROADCAST-1D
能够收到订阅之前最后的数据B
与Rx对比 Coroutine的Channel与Rx的Observable/Subject的对照关系
Rx Observable/Subject | Coroutine Channel |
---|---|
Channel | Observable (Hot Stream) |
ArrayBroadChannel | PublishSubject (Hot Stream) |
ConflatedBroadcastChannel | BehaviorSubject (Hot Stream) |
- Observable接口可以改造为Channel
- Subject接口可以改造为BroadChannel
// Address地址
interface AddressRepository {
// 订阅Address的变更通知
val addresses: BroadcastChannel>// 异步获取指定条件的Address
fun filter(criteria: String): Channel>// 异步判断Address是否存在
suspend fun exists(name: String): Boolean
}
最后 【Kotlin|告别RxJava(Coroutine Channel替代Rx Observable/Subject)】本文主要介绍了使用Coroutine替代Rx的Observable、Subject等接口的基本思路,当然,某些场景下RxJava借助其丰富的操作符,依然可以发挥不可替代的作用。例如短时间内收到大量数据时,可以使用debounce进行防抖;当需要处理多个异步流时使用
concat
、combineLatest
等实现一些复杂逻辑。推荐阅读
- RxJava|RxJava 在Android项目中的使用(一)
- android防止连续点击的简单实现(kotlin)
- 2020年,告别焦虑的自己,2021年,期待满意的自己。
- retrofit2-kotlin-coroutines-adapter|retrofit2-kotlin-coroutines-adapter 超时引起的崩溃
- 加油自己
- Kotlin泛型的高级特性(六)
- Kotlin基础(10)-代理模式在kotlin中的使用
- Android|Android Kotlin实现AIDL跨进程通信
- 我欠她一个温柔的告别和一个抱歉
- 告别无效忙碌、低效生活,“重要”“紧急”四象限法则轻松搞定