Jetpack|Kotlin 之 协程(二)启动取消协程

协程的构建器 launch和async构建器都用来启动新协程
launch,返回一个job并且不附带任何结果值
async,返回一个Deferred,Deferred也是一个job,可以使用.await()在一个延期的值上得到它的最终结果

//等待一个作业:join与await private fun runBlocking1(){ //runBlocking可以把主线程变成一个协程 //job1和job2是runBlocking的子协程 //runBlocking会等待job1和job2这两个子协程执行完毕,会阻塞主线程(阻塞:按钮按下不会立马弹起job1和job2执行完了才会弹起) runBlocking {val job1 = launch { delay(2000) Log.v("zx", "job1 to finish") }val job2 = async { delay(2000) Log.v("zx", "job2 to finish") "job2 value" } //await可以得到返回值 val job2Result = job2.await() Log.v("zx", "job2的返回值:$job2Result")}

需求:等待job1执行完毕以后再执行job2和job3
如果通过launch来启动的话,用join函数
如果通过async来启动的话,用await函数
//join和await都是挂起函数,不会阻塞主线程//如果通过launch来启动的话,用join函数 runBlocking {val job1 = launch { delay(2000) Log.v("zx", "job1 to finish") } //这个函数会等待job1执行完后才会执行后面的 job1.join()val job2 = launch { delay(100) Log.v("zx", "job2 to finish") } val job3 = launch { delay(100) Log.v("zx", "job3 to finish") }} //如果通过async来启动的话,用await函数 runBlocking {val job1 = async { delay(2000) Log.v("zx", "job1 to finish2") } //这个函数会等待job1执行完后才会执行后面的 job1.await()val job2 = async { delay(100) Log.v("zx", "job2 to finish2") } val job3 = async { delay(100) Log.v("zx", "job3 to finish2") }} }

需求:前面2个任务相加的结果给第三个任务(async结构化并发)
//runBlocking 在主线程中,子协程会继承父协程的上下文 //runBlocking是Dispatchers.Main中启动的,doOne和doTwo也会使用父协程的调度器Dispatchers.Main中启动 private fun runBlocking2() { //前面2个任务相加的结果给第三个任务(async结构化并发) runBlocking { val time = measureTimeMillis { //同步的 val one = doOne() val two = doTwo() Log.v("zx", "数据${one + two}") } Log.v("zx", "time = $time") } runBlocking { val time = measureTimeMillis { //异步的 val one = async { doOne() } val two = async { doTwo() } Log.v("zx", "数据${one.await() + two.await()}")//下面这种写法是错误的 //val one2 = async { doOne() }.await() //val two2 = async { doTwo() }.await() //Log.v("zx","数据${one2+two2}") } Log.v("zx", "asynctime = $time") } }private suspend fun doOne():Int{ delay(1000) return 1 } private suspend fun doTwo():Int{ delay(1000) return 2 }

协程的四种启动模式 CoroutineStart.DEFAULT: 协程创建后立即开始调度,调度前如果协程被取消,则执行取消
CoroutineStart.ATOMIC: 协程创建后立即开始调度,协程执行到第一个挂起点之前不响应取消
CoroutineStart.LAZY: 协程被需要时,包括主动调用协程的start,join,await等函数时才会开始调度,如果调度前被取消,则协程进入异常结束状态
CoroutineStart.UNDISPATCHED: 协程创建后立即在当前函数栈中执行,直到遇到第一个真正挂起的点
private fun runBlocking3(){ //runBlocking会等待所有子协程全部执行完 runBlocking { val job1 = launch(start = CoroutineStart.DEFAULT) { delay(3000) Log.v("zx","finished") } delay(1000) //CoroutineStart.DEFAULT则会被取消 job1.cancel()val job11 = launch(start = CoroutineStart.ATOMIC) { //delay就是第一个挂起函数,delay这里就是第一个挂起点, // 如果没执行到第一个挂起点之前取消,ATOMIC是不响应取消的 delay(3000) Log.v("zx","finished") } delay(1000) job11.cancel()val job2 = async(start = CoroutineStart.LAZY) { 20 } delay(2000) //调度前被取消,那么进入异常状态 job2.cancel() //如果是launch就用join启动,如果是async就用start或await启动 Log.v("zx","job2 ${job2.await()}")//如何实现使用Dispatchers.IO,你的协程仍然在主线程里面? //答:使用CoroutineStart.UNDISPATCHED,因为当前函数runBlocking在主线程//DISPATCHED是转发,UNDISPATCHED的意思是不转发(在主线程创建的协程,就在主线程执行) //UNDISPATCHED是立即执行,而其他的是立即调度,立即调度不代表立即执行 //立即在当前函数栈中执行,当前函数栈就是在主线程中 val job3 = async(context = Dispatchers.IO, start = CoroutineStart.UNDISPATCHED) { Log.v("zx","当前${Thread.currentThread().name}") } }}

协程的作用域构建器 coroutineScope和runBlocking
区别是
runBlocking是常规函数,而coroutineScope是挂起函数,他们都会等待子协程执行结束
runBlocking会阻塞当前线程来等待
coroutineScope只是挂起,会释放底层线程用于其他用途
coroutineScope:一个协程失败了,所有其他兄弟协程也会被取消
supervisorScope:一个协程失败了,不会影响其他兄弟协程
coroutineScope:一个协程失败了,所有其他兄弟协程也会被取消
private fun runBlocking4(){ //结构化并发,CoroutineScope(作用域构建器)runBlocking{ //协程作用域,coroutineScope一定要等待job1和job2这两个子协程执行完毕, //coroutineScope继承的父协程的协程作用域 coroutineScope { val job1 = launch { delay(500) Log.v("zx", "job1 to finish") }val job2 = async { delay(100) Log.v("zx", "job2 to finish") "job2 value" throw NullPointerException() } } } }

supervisorScope:一个协程失败了,不会影响其他兄弟协程
private fun runBlocking4(){ //结构化并发,CoroutineScope(作用域构建器)runBlocking{ //协程作用域,coroutineScope一定要等待job1和job2这两个子协程执行完毕, //coroutineScope继承的父协程的协程作用域 supervisorScope { val job1 = launch { delay(500) Log.v("zx", "job1 to finish") }val job2 = async { delay(100) Log.v("zx", "job2 to finish") "job2 value" throw NullPointerException() } } } }

Job对象
  • 每个创建的协程(通过launch或async)会返回一个job实例,该实例是协程的唯一标识,并负责管理协程的生命周期
  • 一个任务可以包含一系列状态:新创建(New),活跃(Active),完成中(completing),已完成(completed),取消中(Canceling),已取消(Cancelled),虽然我们无法直接访问这些状态,但是我们可以访问job的属性,isActive,isCanceled和isCompleted
job的生命周期
如果协程处于活跃状态,协程运行出错或者调用job.cancel()都会将当前任务置为取消中(isActive = false isCanceled = true),当所有子协程都完成后,协程会进入已取消状态(isCanceled = true),此时isCompleted = true
协程的取消
  • 取消作用域会取消它的子协程
  • 被取消的子协程并不会影响其他兄弟协程
  • 协程通过抛出CancellationException来处理取消操作
  • 所有kotlinx.coroutines中的挂起函数(withcontext,delay等)都是可取消的
runBlocking { //CoroutineScope自己构建一个协程作用域,不继承runBlocking父协程的上下文 val scope = CoroutineScope(Dispatchers.Default) val job1 = scope.launch { try { delay(1000) Log.v("zx", "job1") } catch (e: Exception) { e.printStackTrace() }} val job2 = scope.launch { delay(1000) Log.v("zx", "job2")}delay(100) //这里取消作用域,那么子协程就会被取消 //被取消的子协程并不会影响其他兄弟协程,所以job2打印出来了 //job1.cancel() //自定义取消异常 job1.cancel(CancellationException("我取消了")) //这里会先打印,runBlocking不会等待CoroutineScope里面的子协程执行完毕 Log.v("zx", "runBlocking") }打印: com.z.zjetpack V/zx: runBlocking com.z.zjetpack W/System.err: java.util.concurrent.CancellationException: 我取消了 com.z.zjetpack V/zx: job2

CPU密集型任务取消
isActive是一个可以使用在CoroutineScope的拓展属性,检查job是否处于活跃状态
ensureActive():如果job处于非活跃状态,这个方法会立即抛出异常
yield函数会检查所在协程状态,如果已经取消则抛出CancellationException予以响应。它还会尝试让出线程执行权,给其他协程提供执行机会。(如果此任务特别抢占系统资源,那么可以使用yield)
如下是不包含挂起函数的密集型任务
runBlocking { val startTime = System.currentTimeMillis() val job1 = launch(Dispatchers.Default) { var nextPrintTime = startTime var i = 0 while (i < 5) { //每隔0.5秒打印一次 if (System.currentTimeMillis() > nextPrintTime) { Log.v("zx", "i = ${i++}") nextPrintTime += 500 }} }Log.v("zx", "等待取消") delay(1000) //因为不存在suspend关键字的挂起函数,所以无法取消 //job1.cancel() //job1.join() //等同于上方2个方法,为什么要用join,join是等待的意思,执行cancel()方法后,不会立马取消而是进入cancelling, //即取消中,所以join方法是等待取消中变为取消完成。 job1.cancelAndJoin() Log.v("zx", "取消中") }打印: com.z.zjetpack V/zx: 等待取消 com.z.zjetpack V/zx: i = 0 com.z.zjetpack V/zx: i = 1 com.z.zjetpack V/zx: i = 2 com.z.zjetpack V/zx: i = 3 com.z.zjetpack V/zx: i = 4 com.z.zjetpack V/zx: 已取消

可以发现,我们调用了cancelAndJoin去执行取消,最终的结果是并没有取消,那么这种密集型任务怎么取消呢?
while (i < 5 && isActive)

while (i < 5) { ensureActive() ...

while (i < 5) { yield() ...

打印: com.z.zjetpack V/zx: 等待取消 com.z.zjetpack V/zx: i = 0 com.z.zjetpack V/zx: i = 1 com.z.zjetpack V/zx: i = 2 com.z.zjetpack V/zx: 已取消

上面3种方式都可以取消。
协程取消的副作用
  • 在finally种释放资源
  • use 函数:该函数只能被实现了Closeable的对象使用,程序结束时会自动调用close方法,适合文件对象。
因为协程取消了就会抛出异常,那么下面的代码就不会执行了,下面的代码有可能要释放资源,那么下面的代码不执行了,也就不会释放资源了,比如IO操作等,那么怎么处理呢?
答:在finally种释放资源,不管取不取消,finally代码块都会执行
runBlocking { val job1=launch { try { repeat(10) { Log.v("zx","sleep") delay(1000) } }finally { //不管取不取消这里都会执行 //取消后会抛出异常,不影响我释放资源 Log.v("zx","释放资源") }} delay(2000) job1.cancel() }

use函数,比如我们需要读取txt文件 普通写法:
private fun read(){ val input = assets.open("1.txt") val br = BufferedReader(InputStreamReader(input)) with(br) { var line: String? try { while (true) { line = readLine() ?: break Log.v("zx","数据$line") } } finally { close() } } }

use写法
private fun readUse(){ val input = assets.open("1.txt") val br = BufferedReader(InputStreamReader(input)) with(br) { var line: String? use { while (true) { line = readLine() ?: break Log.v("zx","数据$line") } } } }

不能取消的任务
  • 处于取消中的协程不能够挂起,当协程被取消后想要调挂起函数,需要放在withContext(NonCancellable) 中,这样会挂起运行中的代码并保持取消中状态,直到任务处理完成。
例子:
runBlocking { val job1 = launch { try { repeat(10) { Log.v("zx", "sleep") delay(1000) } } finally { Log.v("zx", "开始sleep") delay(1000) Log.v("zx", "结束sleep") }} delay(2000) job1.cancel() } 打印: com.z.zjetpack V/zx: sleep com.z.zjetpack V/zx: sleep com.z.zjetpack V/zx: 开始sleep

可以发现结束sleep永远不会打印出来,那怎么办呢?
使用 withContext(NonCancellable)
runBlocking { val job1 = launch { try { repeat(10) { Log.v("zx", "sleep") delay(1000) } } finally { //如果想要协程的取消不影响这里调用挂起函数,那么需要用到 withContext(NonCancellable) // 长驻任务也可以用这个 withContext(NonCancellable) { Log.v("zx", "开始sleep") delay(1000) Log.v("zx", "结束sleep") }}} delay(2000) job1.cancel() } 打印: com.z.zjetpack V/zx: sleep com.z.zjetpack V/zx: sleep com.z.zjetpack V/zx: 开始sleep com.z.zjetpack V/zx: 结束sleep

超时任务
  • 很多请求取消协程的理由是它有可能超时
  • withTimeoutOrNull通过返回null来进行超时操作,从而替代抛出一个异常
    例子
runBlocking { //需要在1秒内处理完 withTimeout(1000) { repeat(10) { Log.v("zx", "sleep") delay(500) } } }

如上,如果在一秒内没处理完,那么就会抛出异常 kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
那么如果网络请求在1秒内没返回,我们不想抛出异常,只想返回个默认值怎么办呢?
那么如果我们不想抛出异常,只想返回个null值的情况,该怎么做呢?
答:使用withTimeoutOrNull
runBlocking { //1秒内处理完,如果在1秒内没处理完,那么就返回null来代替抛出异常 val result = withTimeoutOrNull(1000) { repeat(10) { Log.v("zx", "sleep") delay(500) } "完成" } ?: "默认数据"Log.v("zx", "结果:$result") } 打印: com.z.zjetpack V/zx: sleep com.z.zjetpack V/zx: sleep com.z.zjetpack V/zx: 结果:默认数据

如上,如果在1秒内完成了,那么结果为 完成,如果没做完会返回结果null,为null即显示默认数据
协程的异常处理
runBlocking { val job1 = launch { try { throw NullPointerException() } catch (e: Exception) { log("launch,$e.toString()") }}val job2 = async { try { throw NullPointerException() } catch (e: Exception) { log("async,$e.toString()") } } job2.await()}

异常的传播特性是:当一个协程生成异常,它会传给它的父级,之后父级会取消它自己的子级,然后取消它自己,最后将异常传给它的父级。
那么如果我们想要一个子协程发送异常不影响其他协程怎么办呢?
答:使用SupervisorJob和SupervisorScope
使用SupervisorJob时,一个子协程的运行失败不会影响到它的子协程。SupervisorJob不会传播异常给它的父级,他会让子协程自己处理异常
使用CoroutineExceptionHandler捕获协程异常
val handle =CoroutineExceptionHandler { coroutineContext, throwable -> Log.v("zx","$throwable") }CoroutineScope(Dispatchers.Main).launch(handle) { throw NullPointerException() }

Android种全局异常处理
全局异常处理器可以获取到所有协程未处理的未捕获异常,不管它并不能对异常进行捕获,虽然不能阻止程序崩溃,全局异常处理器在程序调试和异常上报场景有很大作用。
我们需要在app/src/main下面创建一个resources/META-INF/services目录并在其中创建一个名为kotlinx.coroutines.CoroutineExceptionHandler的文件,文件内容就是异常处理器的全类名。
Jetpack|Kotlin 之 协程(二)启动取消协程
文章图片

package com.z.zjetpack.coroutineimport android.util.Log import kotlinx.coroutines.CoroutineExceptionHandler import kotlin.coroutines.CoroutineContextclass GException:CoroutineExceptionHandler { override val key = CoroutineExceptionHandleroverride fun handleException(context: CoroutineContext, exception: Throwable) { Log.v("zx","异常信息:$exception") } }

kotlinx.coroutines.CoroutineExceptionHandler文件中的内容为:包名+类名
com.z.zjetpack.coroutine.GException
Jetpack|Kotlin 之 协程(二)启动取消协程
文章图片

取消与异常
  • 取消与异常紧密相关,协程内部使用CancellationException来取消,这个异常会被忽略,当子协程被取消时,不会取消它的父协程
runBlocking { val job = launch { val childjob = launch { try { delay(Long.MAX_VALUE) }finally { Log.v("zx","子协程被取消了") } } //出让执行权,让子协程有机会执行 yield() Log.v("zx","开始取消") childjob.cancelAndJoin() //childjob.cancel() //没有AndJoin就会继续往下执行 //Log.v("zx","取消中。。。") yield() Log.v("zx","父协程还没被取消")//父协程中释放资源 ... } job.join() }打印: com.z.zjetpack V/zx: 开始取消 com.z.zjetpack V/zx: 子协程被取消了 com.z.zjetpack V/zx: 父协程还没被取消

  • 如果一个协程遇到了CancellationException以外的异常,它将使用该异常取消它的父协程。当父协程的所有子协程都结束后,异常才会被父协程处理。
runBlocking {val handle = CoroutineExceptionHandler { coroutineContext, throwable -> Log.v("zx","捕获异常:$throwable") }val job1 = GlobalScope.launch(handle) { val child1 = launch { try { delay(Long.MAX_VALUE) }finally { //这里如果要执行挂起函数要用NonCancellable withContext(NonCancellable) { Log.v("zx","child1子协程已被取消,但异常未被处理") delay(100) Log.v("zx","child1子协程已完成") }} } val child2 = launch { delay(10) Log.v("zx","child2 抛出异常") throw NullPointerException()} } job1.join() }打印: com.z.zjetpack V/zx: child2 抛出异常 com.z.zjetpack V/zx: child1子协程已被取消,但异常未被处理 com.z.zjetpack V/zx: child1子协程已完成 com.z.zjetpack V/zx: 捕获异常:java.lang.NullPointerException

异常聚合
【Jetpack|Kotlin 之 协程(二)启动取消协程】当协程的多个子协程因为异常而失败时,一般取第一个异常处理,在第一个异常后发生的所有异常都会绑定到第一个异常上。
runBlocking {val handle = CoroutineExceptionHandler { coroutineContext, throwable -> Log.v("zx", "其他异常:${throwable.suppressed.contentToString()}") Log.v("zx", "当前捕获异常:$throwable") }GlobalScope.launch(handle) { launch { try { delay(Long.MAX_VALUE) } finally { throw NullPointerException() }} launch { try { delay(Long.MAX_VALUE) } finally { throw IndexOutOfBoundsException() }} launch { delay(100) throw ArithmeticException() } } }打印: 其他异常:[java.lang.NullPointerException, java.lang.IndexOutOfBoundsException] 当前捕获异常:java.lang.ArithmeticException

    推荐阅读