Kotlin之Channel实战

  • 通道
    • 认识Channel
    • 容量与迭代
    • produce与actor
    • Channel的关闭
    • BroadcastChannel
  • 多路复用
    • 什么是多路复用
    • 复用多个await
    • 复用多个Channel
    • SelectClause
    • Flow实现多路复用
  • 【Kotlin之Channel实战】并发安全
    • 协程的并发工具
    • Mutex
    • Semaphore
    认识ChannelChannel实际上是一个并发安全的队列,它可以用来连接协程,实现不同协程的通信。
    Kotlin之Channel实战
    文章图片

    看个生产者消费者的例子:
    @Test fun testChannel() = runBlocking { val channel = Channel() // 生产者 val producer = GlobalScope.launch { var i = 1 while (true) { delay(1000) println("sending $i") channel.send(i++) } } // 消费者 val consumer = GlobalScope.launch { while (true) { val element = channel.receive() println("receive: $element") } } joinAll(producer, consumer) }

    生产者每隔一秒生产一个元素,然后立刻被消费者消费掉。
Channel的容量 Channel实际上就是一个队列,队列中一定存在缓冲区,那么一旦这个缓冲区满了,并且也一直没有人调用receive并取走元素,send就需要挂起。若故意让接收端的节奏放慢,发现send总是会挂起,知道receive之后才会继续往下执行。
public fun Channel( capacity: Int = RENDEZVOUS, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, onUndeliveredElement: ((E) -> Unit)? = null ): Channel

Channel 默认有一个容量大小RENDEZVOUS,值为0。上面的例子如果调慢消费者的节奏,那么就会依照消费者的节奏,每当消费者消费一个元素,生产者才会生产一个,send总是会挂起,等待消费者消费。
@Test fun testChannel() = runBlocking { val channel = Channel() val start = System.currentTimeMillis() // 生产者 val producer = GlobalScope.launch { var i = 1 while (true) { delay(1000) println("sending $i, ${System.currentTimeMillis() - start}") channel.send(i++) } } // 消费者 val consumer = GlobalScope.launch { while (true) { delay(5000) val element = channel.receive() println("receive: $element, ${System.currentTimeMillis() - start}") } } joinAll(producer, consumer) }

sending 1, 1018 receive: 1, 5017 sending 2, 6022 receive: 2, 10021 sending 3, 11024 receive: 3, 15026 ...

由于缓冲区默认为0,所以生产者每次不得不等待消费者消费掉元素后再生产。
迭代Channel Channel本身像序列,在读取的时候,可以直接获取一个Channel的iterator。
@Test fun testChannelIterator() = runBlocking { //val channel = Channel() val channel = Channel(Channel.UNLIMITED) val start = System.currentTimeMillis() // 生产者 val producer = GlobalScope.launch { for (i in 1..5) { println("sending $i, ${System.currentTimeMillis() - start}") channel.send(i) } } // 消费者 val consumer = GlobalScope.launch { val it = channel.iterator() while (it.hasNext()) { val element = it.next() println("receive: $element, ${System.currentTimeMillis() - start}") delay(2000) } } joinAll(producer, consumer) }

sending 1, 8 sending 2, 12 sending 3, 12 sending 4, 12 sending 5, 12 receive: 1, 15 receive: 2, 2023 receive: 3, 4026 receive: 4, 6031 receive: 5, 8037

上面就是迭代的方法,把缓冲区设置成UNLIMITED,看到生产者一下子把5个元素生产完发送出来,消费者一个一个按照自己的节奏消费。如果缓冲区还是默认,那么和上一个例子一样,还是消费一个后再生产一个。
produce与actor
  • 构造生产者与消费者的便捷方法
  • 可以通过produce方法启动一个生产者协程,并返回一个ReceiveChannel,其他协程就可以用这个Channel来接收数据了。反过来,可以用actor启动一个消费者协程。
    看个例子,使用produce创建一个receiveChannel,然后启动一个协程消费receiveChannel中的元素。
    @Test fun testProducer() = runBlocking { val receiveChannel = GlobalScope.produce(capacity = 50) { repeat(5) { delay(1000) println("produce $it") send(it) } } val consumer = GlobalScope.launch { for (i in receiveChannel) { delay(3000) println("consume: $i") } } consumer.join() }

    produce 0 produce 1 produce 2 consume: 0 produce 3 produce 4 consume: 1 consume: 2 consume: 3 consume: 4Process finished with exit code 0

    produce的源码如下,容量默认为0。所以在上面例子中创建receiveChannel的时候不设置容量,那么就会变成:生产一个元素,消费一个元素,交替进行。设置了50个容量后,可以一下子产生多个元素。当然,该例子的消费者消费元素的时间是delay 3秒钟,所以每次delay3秒的期间,生产者(模拟每秒钟生产1个元素)生产了3个元素。
    public fun CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, @BuilderInference block: suspend ProducerScope.() -> Unit ): ReceiveChannel

    再来看看actor:
    public fun CoroutineScope.actor( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, // todo: Maybe Channel.DEFAULT here? start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, block: suspend ActorScope.() -> Unit ): SendChannel

    用actor可以创建一个sendChannel,然后启动协程使用sendChannel发送元素。例子:
    @Test fun testActor() = runBlocking { val sendChannel = GlobalScope.actor { while (true) { val element = receive() println("receive: $element") } }val producer = GlobalScope.launch { for (i in 1..3) { println("send: $i") sendChannel.send(i) } }producer.join() }

    Channel的关闭
  • produce和actor返回的channel都会随着对应的协程执行完毕而关闭,也正是如此,Channel才被称为热数据流。
  • 对于一个Channel,如果调用了它的close方法,会立即停止接收新元素,也就是说,这时它的isClosedForSend会立即返回true。而由于Channel缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等缓冲区中所有的元素被读取之后isClosedForReceive才会返回true。
  • Channel的生命周期最好由主导方来维护,建议由主导的一方实现关闭。
    @Test fun testClose() = runBlocking { val channel = Channel(3) val producer = GlobalScope.launch { List(3) { channel.send(it) println("send $it") } channel.close() println("close channel. closeForSend: ${channel.isClosedForSend}, closeFoReceive: ${channel.isClosedForReceive}") } val consumer = GlobalScope.launch { for (e in channel) { println("receive: $e") delay(1000) } println("after consuming. closeForSend: ${channel.isClosedForSend}, closeFoReceive: ${channel.isClosedForReceive}") } joinAll(producer, consumer) }

    send 0 send 1 send 2 receive: 0 close channel. closeForSend: true, closeFoReceive: false receive: 1 receive: 2 after consuming. closeForSend: true, closeFoReceive: trueProcess finished with exit code 0

    从上面的例子看到,消费者每秒钟消费一个元素,有1秒的处理时间,这期间生产者把3个元素都send出来然后关闭Channel,这时候刚刚消费了一个元素,所以closeForSend是true,closeForReceive是false,等到消费完毕所有元素后,值都为true。
    广播BroadcastChannel前面提到,发送端和接收端在Channel中存在一对多的情形,从数据处理本身来讲,虽然有多个接收端,但是同一个元素只会被一个接收端读到。广播则不然,多个接收端不存在互斥行为。
    @Test fun testBroadcast() = runBlocking { val broadcastChannel = BroadcastChannel(Channel.BUFFERED) val producer = GlobalScope.launch { List(3) { delay(100) broadcastChannel.send(it) } broadcastChannel.close() } List(3) { index -> GlobalScope.launch { val receiveChannel = broadcastChannel.openSubscription() for (i in receiveChannel) { println("#$index received $i") } } }.joinAll() }

    #2 received 0 #0 received 0 #1 received 0 #1 received 1 #0 received 1 #2 received 1 #1 received 2 #0 received 2 #2 received 2Process finished with exit code 0

    创建一个BroadcastChannel,广播数据,开启多个协程订阅广播,每个协程都能接收到广播数据。
    Channel可以转换成BroadcastChannel,如下效果一样:
    @Test fun testBroadcast2() = runBlocking { //val broadcastChannel = BroadcastChannel(Channel.BUFFERED) val channel = Channel() val broadcastChannel = channel.broadcast(Channel.BUFFERED) val producer = GlobalScope.launch { List(3) { delay(100) broadcastChannel.send(it) } broadcastChannel.close() } List(3) { index -> GlobalScope.launch { val receiveChannel = broadcastChannel.openSubscription() for (i in receiveChannel) { println("#$index received $i") } } }.joinAll() }

    什么事多路复用数据通信系统或者计算机网络系统中,传输媒体的带宽或容量往往大于传输单一信号的需求,为了有效地利用通信线路,希望一个信道同时传输多路信号,这就是多路复用技术(Multiplexing)。
    复用多个await
    两个API分别从网络和本地缓存获取数据,期望哪个先返回就先用哪个做展示。
    Kotlin之Channel实战
    文章图片

    看例子:
    private suspend fun CoroutineScope.getUserFromLocal(name: String) = async { delay(1000) "local $name" }private suspend fun CoroutineScope.getUserFromRemote(name: String) = async { delay(2000) "remote $name" }data class Response(val value: T, val isLocal: Boolean = false)@Test fun testSelectAwait() = runBlocking { GlobalScope.launch { val localUser = getUserFromLocal("Jack") val remoteUser = getUserFromRemote("Mike") val userResponse = select { localUser.onAwait { Response(it, true) } remoteUser.onAwait { Response(it, false) } } println("render on UI: ${userResponse.value}") }.join() }

    render on UI: local JackProcess finished with exit code 0

    local方法延迟时间短,先返回了,所以select选取的是local数据。若把remote时间缩短,那么select就会选择remote返回的数据。
    复用多个Channel
    跟await类似,会接收到最快的那个Channel消息。看下面的例子:
    @Test fun testSelectChannel() = runBlocking { val channels = listOf(Channel(), Channel()) GlobalScope.launch { delay(100) channels[0].send(100) } GlobalScope.launch { delay(50) channels[1].send(50) } val result = select { channels.forEach { channel -> channel.onReceive { it } } } println("result $result") delay(1000) }

    result 50Process finished with exit code 0

    两个Channel发送数据元素,第一个延迟100毫秒发送,第二个延迟50毫秒发送,用select接收的时候,收到延迟50毫秒发送的那个。
    SelectClause
  • 怎么知道哪些事件可以被select呢?其实所有能被select的事件都是SelectClauseN类型,包括:
    • SelectClause0: 对应事件没有返回值,例如join没有返回值,那么onJoin就是SelectClauseN类型,使用时,onJoin的参数是一个无参函数。
    • SelectClause1: 对应事件有返回值,前面的onAwait和onReceive都是类似情况。
    • SelectClause2: 对应事件有返回值,此外还需要一个额外的参数,例如Channel.onSend有两个参数,第一个是Channel数据类型的值,表示即将发送的值,第二个是发送成功的回调参数。
  • 如果想要确认挂起函数时否支持select,只需要查看其是否存在对应的SelectClauseN类型可回调即可。
    看一个无参函数的例子:
    @Test fun testSelectClause0() = runBlocking { val job1 = GlobalScope.launch { delay(100) println("job 1") }val job2 = GlobalScope.launch { delay(10) println("job 2") } select { job1.onJoin { println("job 1 onJoin") } job2.onJoin { println("job 2 onJoin") } } delay(1000) }

    job 2 job 2 onJoin job 1Process finished with exit code 0

    启动两个协程job1和job2,job2延迟少,先打印,所以select中选择了job2 (打印了job 2 onJoin),因为两个协程没有返回值,或者说返回值是Unit,所以在select后面声明。
下面看一个两个参数的例子,第一个是值,第二个是回调参数。
@Test fun testSelectClause2() = runBlocking { val channels = listOf(Channel(), Channel()) println(channels) launch(Dispatchers.IO) { select { launch { delay(10) channels[1].onSend(10) { sendChannel -> println("sent on $sendChannel") } } launch { delay(100) channels[0].onSend(100) { sendChannel -> println("sent on $sendChannel") } } } } GlobalScope.launch { println(channels[0].receive()) } GlobalScope.launch { println(channels[1].receive()) } delay(1000) }

[RendezvousChannel@78aab498{EmptyQueue}, RendezvousChannel@7ee955a8{EmptyQueue}] 10 sent on RendezvousChannel@7ee955a8{EmptyQueue}Process finished with exit code 0

两个Channel对象78aab498和7ee955a8,启动两个协程,分别延迟10毫秒和100毫秒,分别使用onSend发送数据10和100,第二个参数是回调;
然后启动两个协程分别接收两个Channel对象发送的数据,可以看到结果选择了较少延迟的那个协程。
再来看上面"是否存在对应的SelectClauseN类型"的意思:
public val onSend: SelectClause2>

public val onJoin: SelectClause0

可以看到源码中,onSend和onJoin都有SelectClauseN接口,所以都支持select。
使用Flow实现多路复用
多数情况下,可以通过构造合适的Flow来实现多路复用的效果。

    推荐阅读