Kotlin协程源码分析(二)之Channel

这章理一下channel,先分享一句学习时候看到的话:Do not communicate by sharing memory; instead, share memory by communicating.。本来好像是用在go上的,但也有着异曲同工之妙啊
channel顾名思义是管道,有入口与出口。因此最底层有sendChannel&receiveChannel
produce
Produce = Coroutine + Channel

example:
val channel: ReceiveChannel = produce(CommonPool) { for (i in 0 .. 100) { delay(1000) channel.send(i) } }launch(UI) { for (number in channel) { textView.text = "Latest number is $number" } }

produce也是产生协程,跟普通的launch不同他会返回一个receiveChannel,后面会看到receiveChannel是一个迭代器,同时会suspendhasNext和next()上,因此另一个协程就可以使用for...in...等待接受。
@ExperimentalCoroutinesApi public fun CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, @BuilderInference block: suspend ProducerScope.() -> Unit ): ReceiveChannel { val channel = Channel(capacity) val newContext = newCoroutineContext(context) val coroutine = ProducerCoroutine(newContext, channel) coroutine.start(CoroutineStart.DEFAULT, coroutine, block) return coroutine }

同时,produce发射完成后是会自己关闭的,省的我们自己关闭信道:
override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) { val cause = (state as? CompletedExceptionally)?.cause val processed = _channel.close(cause) if (cause != null && !processed && suppressed) handleExceptionViaHandler(context, cause) }

通过jobinvokeOnCompletion实现。
actor
example
val channel: SendChannel = actor(UI) { for (number in channel) { textView.text = "A new click happend!" } }button.setOnClickListener { launch(CommonPool) { channel.send(it) } }

produce相反返回sendChannel
高级用法
fun Node.onClick(action: suspend (MouseEvent) -> Unit) { // launch one actor to handle all events on this node val eventActor = GlobalScope.actor(Dispatchers.Main) { for (event in channel) action(event) // pass event to action } // install a listener to offer events to this actor onMouseClicked = EventHandler { event -> eventActor.offer(event) } }

我们看这里用了offer而不是send,我们可以把for..in..先简单的写成以下形式:
while(iterator.hasNext()){ //suspend fuction val event = iterator.next() //suspend function action(event) }private suspend fun hasNextSuspend(): Boolean = suspendAtomicCancellableCoroutine sc@ { cont -> val receive = ReceiveHasNext(this, cont) while (true) { if (channel.enqueueReceive(receive)) { channel.removeReceiveOnCancel(cont, receive) return@sc } // hm... something is not right. try to poll val result = channel.pollInternal() this.result = result if (result is Closed<*>) { if (result.closeCause == null) cont.resume(false) else cont.resumeWithException(result.receiveException) return@sc } if (result !== POLL_FAILED) { cont.resume(true) return@sc } } }

假设队列里没有东西时,enqueue一个receiveHasNext进行等待。过会解释一下channel的原理。现在只要知道,当有sender.send时,与receive关联的cont就会被调用resume,那么显而易见,当action正在处理时队列中没有receiver,而offer是不会suspend的,因此事件就被抛弃。
conflation事件合并
fun Node.onClick(action: suspend (MouseEvent) -> Unit) { // launch one actor to handle all events on this node val eventActor = GlobalScope.actor(Dispatchers.Main, capacity = Channel.CONFLATED) { // <--- Changed here for (event in channel) action(event) // pass event to action } // install a listener to offer events to this actor onMouseClicked = EventHandler { event -> eventActor.offer(event) } }

这里我们使用CONFALTED,即合并所有事件,因此接受者永远处理最近一个。原理如下:
result === OFFER_FAILED -> { // try to buffer val sendResult = sendConflated(element) when (sendResult) { null -> return OFFER_SUCCESS is Closed<*> -> { conflatePreviousSendBuffered(sendResult) return sendResult } } // otherwise there was receiver in queue, retry super.offerInternal }

offer失败时需要suspend等待,(说明还没有接受者或者人家正忙着),插入sendBuffered,同时移除前面已有的sendBuffered
var prev = node.prevNode while (prev is SendBuffered<*>) { if (!prev.remove()) { prev.helpRemove() } prev = prev.prevNode }

这样永远是最近一个生效。
大概channel原理
其实看abstractChannel会先看到一个queue,这时候显而易见会把它当做是像linkedlist那种塞数据的地方。但其实queue是用来放receive/send node。当队列为空时,send时会先从队列取第一个receiveNode,取不到就suspend,把自己当成sendNode放入; 不然就把数据直接交给receiveNode
具体channel实现时,例如ArrayChannel(buffer),会多加一个buffer队列,当队列为空时,send时会先从队列取第一个receiveNode,取不到就放入buffer队列,如果buffer队列满了,把自己当成sendNode放入就suspend; 同时把不然就把数据直接交给receiveNode
select
参考
suspend fun selectInsult(john: ReceiveChannel, mike: ReceiveChannel) { select { // means that this select expression does not produce any result john.onReceive { value ->// this is the first select clause println("John says '$value'") } mike.onReceive { value ->// this is the second select clause println("Mike says '$value'") } } }

select可以等任何一个回来,也可以等await:
fun adult(): Deferred = async(CommonPool) { // the adult stops the exchange after a while delay(Random().nextInt(2000).toLong()) "Stop it!" }suspend fun selectInsult(john: ReceiveChannel, mike: ReceiveChannel, adult: Deferred) { select { // [..] the rest is like before adult.onAwait { value -> println("Exasperated adult says '$value'") } } }

linux里的select其实类似,(能知道是哪个吗?):
final override val onReceive: SelectClause1 get() = object : SelectClause1 { override fun registerSelectClause1(select: SelectInstance, block: suspend (E) -> R) { registerSelectReceive(select, block) } }private fun registerSelectReceive(select: SelectInstance, block: suspend (E) -> R) { while (true) { if (select.isSelected) return if (isEmpty) { val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (E?) -> R), nullOnClose = false) val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return when { enqueueResult === ALREADY_SELECTED -> return enqueueResult === ENQUEUE_FAILED -> {} // retry else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult") } } else { val pollResult = pollSelectInternal(select) when { pollResult === ALREADY_SELECTED -> return pollResult === POLL_FAILED -> {} // retry pollResult is Closed<*> -> throw recoverStackTrace(pollResult.receiveException) else -> { block.startCoroutineUnintercepted(pollResult as E, select.completion) return } } } } }

能看到onReceive是实现SelectCaluse1,同时在selectBuilderImpl环境下:
override fun SelectClause1.invoke(block: suspend (Q) -> R) { registerSelectClause1(this@SelectBuilderImpl, block) }

【Kotlin协程源码分析(二)之Channel】所以会往queueenqueue两个receive节点。
同时能看到如果任何一次select节点获取数据以后:
when { pollResult === ALREADY_SELECTED -> return pollResult === POLL_FAILED -> {} // retry pollResult is Closed<*> -> throw recoverStackTrace(pollResult.receiveException) else -> { block.startCoroutineUnintercepted(pollResult as E, select.completion) return } }

会调用block.startCoroutineUnintercepted:
/** * Use this function to restart coroutine directly from inside of [suspendCoroutine], * when the code is already in the context of this coroutine. * It does not use [ContinuationInterceptor] and does not update context of the current thread. */ internal fun (suspend (R) -> T).startCoroutineUnintercepted(receiver: R, completion: Continuation) { startDirect(completion) {actualCompletion -> startCoroutineUninterceptedOrReturn(receiver, actualCompletion) } }

之前讲过startCoroutineUnintercepted其实就是function.invoke(),所以就调用block.invoke(select的completion是自己),获得值后通过uCont.resume即可。
onAwait 这个和deferedjob(Support)搞在一起:
private class SelectAwaitOnCompletion( job: JobSupport, private val select: SelectInstance, private val block: suspend (T) -> R ) : JobNode(job) { override fun invoke(cause: Throwable?) { if (select.trySelect(null)) job.selectAwaitCompletion(select, block) } override fun toString(): String = "SelectAwaitOnCompletion[$select]" }

可以看到当任务成功后,select会被继续进行
broadcast
首先解决一个问题,一个sender多个receiver是怎么处理的。
val channel = Channel() launch { val value1 = channel.receive() } launch { val value2 = channel.receive() } launch { channel.send(1) }

因为是1vs1消费。只有第一个会收到,因为它插在等待队列的第一个。用broadcast可以保证大家都收到。它维护一个subscribeuser list,所有消费者都能收到channel.sendelement
operation
map
public fun ReceiveChannel.map(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { consumeEach { send(transform(it)) } }

可以实现跟RX一样的操作符,接受者收到后经过转换再进行发送返回最终新的receiveChannel
hot or cold
channelhot的。
When the data is produced by the Observable itself, we call it a cold Observable. When the data is produced outside the Observable, we call it a hot Observable.
Provide abstraction for cold streams
... 这个todo,后续再说。
参考
Even smarter async with coroutine actors
官方文档

    推荐阅读