Kotlin|告别RxJava(Coroutine Channel替代Rx Observable/Subject)

Kotlin|告别RxJava(Coroutine Channel替代Rx Observable/Subject)
文章图片

最近有一些文章建议大家放弃RxJava。在AAC推荐的MVVM最佳实践中重度依赖了RxJava,是不是也可以考虑去掉其中的RxJava呢?
Kotlin|告别RxJava(Coroutine Channel替代Rx Observable/Subject)
文章图片

RxJava的问题

  1. 功能过剩
    MVVM中使用RxJava主要用来进行异步请求以及订阅,但RxJava的能力远不止于此,他更是一个操作符众多的流式响应式框架,功能众多确不被熟知和使用,除了徒增包体积以为,还有误用操作符造成bug的风险
  2. Kotlin不友好
    RxJava虽然可以在Kotlin中使用,但毕竟本身是用Java写的,对函数式Lambda的支持不够Kotlin,使用了很多自定义的Function类型,与Kotlin一起使用时容易混淆,RxKotlin的扩展包仅仅是增加了一些Kt扩展方法,与真正用Kotlin实现的ReactiveX框架还是有差距的。
  3. 生命周期控制
    与LiveData相比,RxJava无法自动感知生命周期,需要手动进行dispose,容易遗忘造成泄露。当然可以通过定义一些Observable的扩展方法实现一定程度的自动化,但总体来说RxJava不是专门为GUI实现的框架,在LivecycleAware方面的建设比较欠缺
虽然RxJava有诸多问题,但是相对于JDK的Promise/Future、Android的Handler/AsyncTask等来说已经是大大的进步了,但是随着Coroutine的推出,很多场景可以优先考虑使用Coroutine替换

使用Coroutine替换 随着Coroutine的不断完善,在异步调用方面基本上可以覆盖RxJava的所有类型。
具体可以参考 基于RxJava项目的Coroutine改造 , 其中介绍了Single请求场景中的改造Coroutine改造。这里想介绍一下Observable/Subject如何用Coroutine进行改造。这种场景一般推荐使用Channel。
Channel与BroadcastChannel Channel的本质就是一个阻塞队列,Coroutine中有两种:
  • Channel :1个生产端对应1个消费端
  • BroadcastChannel :一个生产端对应n个消费端
// Channel Sample val c = ArrayChannel>(2) // 缓冲区capacity=2 launch(CommonPool) { c.consumeEach { Log.d("KT","${it}") } }launch(UI) { c.send("a") c.send("b") c.send("c") }// BroadcastChannel Sample launch(UI) { val bc = ArrayBroadcastChannel>(10) bc.send("A") bc.send("B")// Consumer1 launch(newSingleThreadContext("threadA")) { val subscription1: SubscriptionReceiveChannel> = bc.openSubscription() subscription1.consumeEach { Log.d("KT-BROADCAST-1","${it}") } // 执行不到 }// Consumer2 launch(newSingleThreadContext("threadB")) { val subscription2: SubscriptionReceiveChannel> = bc.openSubscription() subscription2.consumeEach { Log.d("KT-BROADCAST-2","${it}") } // 执行不到 }delay(2000) bc.send("C") bc.send("D") }

输出结果
KT a b cKT-BROADCAST-1C KT-BROADCAST-2C D KT-BROADCAST-1D

openSubscription创建多个Subscription实例实现分别订阅。Channel的数据必须在订阅之后发送才能让消费端收到,反之则收不到数据
send()与offer() send与offer都是向Channel添加数据
  • send()
    只能在协程中使用(launch{… }),超出缓冲数量依然可以继续添加
  • offer()
    可以在协程外调用,超出缓冲区数量时返回false,无法继续添加
ArrayBroadcastChannel与 ConflatedBroadcastChannel ArrayBroadcastChannel与 ConflatedBroadcastChannel都是BroadcastChannel的是实现
launch(UI) { val bc = ConflatedBroadcastChannel>() bc.send("A") bc.send("B")// Consumer1 launch(newSingleThreadContext("threadA")) { val subscription1: SubscriptionReceiveChannel> = bc.openSubscription() subscription1.consumeEach { Log.d("KT-BROADCAST-1","${it}") } }// Consumer2 launch(newSingleThreadContext("threadB")) { val subscription2: SubscriptionReceiveChannel> = bc.openSubscription() subscription2.consumeEach { Log.d("KT-BROADCAST-2","${it}") } }delay(2000) bc.send("C") bc.send("D")Log.d("KT-BROADCAST-LATEST","${bc.valueOrNull}") }

KT-BROADCAST-1B KT-BROADCAST-2B KT-BROADCAST-1C KT-BROADCAST-2C D KT-BROADCAST-LATESTD KT-BROADCAST-1D

能够收到订阅之前最后的数据B

与Rx对比 Coroutine的Channel与Rx的Observable/Subject的对照关系
Rx Observable/Subject Coroutine Channel
Channel Observable (Hot Stream)
ArrayBroadChannel PublishSubject (Hot Stream)
ConflatedBroadcastChannel BehaviorSubject (Hot Stream)
简单总结起来:
  • Observable接口可以改造为Channel
  • Subject接口可以改造为BroadChannel
用Coroutine改造后的MVVM的Repo的大概是下面的感觉:
// Address地址 interface AddressRepository { // 订阅Address的变更通知 val addresses: BroadcastChannel>// 异步获取指定条件的Address fun filter(criteria: String): Channel>// 异步判断Address是否存在 suspend fun exists(name: String): Boolean }


最后 【Kotlin|告别RxJava(Coroutine Channel替代Rx Observable/Subject)】本文主要介绍了使用Coroutine替代Rx的Observable、Subject等接口的基本思路,当然,某些场景下RxJava借助其丰富的操作符,依然可以发挥不可替代的作用。例如短时间内收到大量数据时,可以使用debounce进行防抖;当需要处理多个异步流时使用concatcombineLatest等实现一些复杂逻辑。

    推荐阅读