在介绍了大部分概念,现在需要针对这些概念进行详细的说明以及分析。
协程是轻量级的线程
协程概念
非抢占式或协作式
的计算机并发调度的实现,程序可以主动挂起或者恢复执行,
避免在异步程序中使用大量的回调,使用阻塞的方式写出非阻塞的代码。
一种全新处理并发的方式,可以在Android平台上简化异步执行的代码。
协程
主要用来解决两个问题:
在后面的原理介绍中,会介绍与这两个问题相关的概念。
概念介绍
在执行过程中会涉及的一些概念,在后续源码分析也会涉及。
协程体
协程中要执行的操作,是一个被suspend
修饰的lambda表达式
挂起函数
由suspend
修饰的函数,只能在挂起函数
或者协程体
中调用。可以通过调用其他挂起函数
挂起执行代码,而不阻塞当前执行线程。
挂起点
一般对应挂起函数
被调用的位置
续体-Continuation
挂起的协程在挂起点
时的状态。概念上表示挂起点之后的剩余应该执行的代码
。
核心类 协程作用域—CoroutineScope
追踪每一个通过launch
或async
创建的协程。并且任何时候都可以通过scope.cancel()
取消正在执行的协程。
可以通过CoroutineScope
控制协程的生命周期,当Activity/Fragment关闭时,调用cancel()
及时关闭。
协程作用域
主要有三种:
阻塞协程作用域 调用runBlocking()
的线程会被阻塞直到内部协程任务执行完毕。
全局协程作用域 GlobalScope
作用于整个应用的生命周期,并且无法被取消,在界面上使用时,就会导致内存泄漏。
1 2 3 4 5 6 7 8 9 10 public object GlobalScope : CoroutineScope { override val coroutineContext: CoroutineContext get () = EmptyCoroutineContext } public interface CoroutineScope { public val coroutineContext: CoroutineContext }
自定义作用域 自定义协程作用域,可以针对性的控制避免内存泄漏。
1 2 val coroutineContext : CoroutineContext = Dispatchers.Main + Job()val coroutineScope = CoroutineScope(coroutineContext)
需要自定义协程作用域
时,需要构造一个CoroutineContext
作为参数。
CoroutineContext
会在下面重点介绍。
内置的自定义作用域 MainScope
为了方便开发使用,Kotlin标准库提供了MainScope
用于快速生成CoroutineScope
1 public fun MainScope () : CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
viewModelScope
在AndroidX中引入的viewMdoelScope
,在ViewModel销毁时会自动取消协程任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 val ViewModel.viewModelScope: CoroutineScope get () { val scope: CoroutineScope? = this .getTag(JOB_KEY) if (scope != null ) { return scope } return setTagIfAbsent(JOB_KEY, CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate)) }internal class CloseableCoroutineScope (context: CoroutineContext) : Closeable, CoroutineScope { override val coroutineContext: CoroutineContext = context override fun close () { coroutineContext.cancel() } }
协程上下文-CoroutineContext
一组定义协程行为的元素,本体是一个数据结构,类似于Map
,内部实现为单链表
由如下几项构成:
Job:执行的任务
CoroutineDispatcher:协程调度器
CoroutineName:协程的名称,主要用于调试
CoroutineExceptionHandler:处理未被捕获的异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public interface CoroutineContext { public operator fun <E : Element> get (key: Key <E >) : E? public fun <R> fold (initial: R , operation: (R , Element ) -> R ) : R public operator fun plus (context: CoroutineContext ) : CoroutineContext = if (context === EmptyCoroutineContext) this else context.fold(this ) { acc, element -> val removed = acc.minusKey(element.key) if (removed === EmptyCoroutineContext) element else { val interceptor = removed[ContinuationInterceptor] if (interceptor == null ) CombinedContext(removed, element) else { val left = removed.minusKey(ContinuationInterceptor) if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else CombinedContext(CombinedContext(left, element), interceptor) } } } public fun minusKey (key: Key <*>) : CoroutineContext }
自定义CoroutineContext 1 val coroutineContext : CoroutineContext = Dispatchers.Main + Job() + CoroutineName("name" )
CoroutineContext
通过+
进行元素的合并,+
右侧的元素会覆盖左侧的元素。
CoroutineContext
存储方式为左向链表
,链表的每一个节点都是CombinedContext
,当存在协程拦截器
时,永远处于链表的最后。
经过上述的plus
操作后,最后得到一个完整的CoroutineContext
对象。
CoroutineContext的父子关系
每个协程都会有一个父对象,协程的父级CoroutineContext
和父协程的CoroutineContext
是不一致的。
父级上下文 = 默认值 + 继承的CoroutineContext
+参数
默认值
:一些元素包含的默认值,例如默认Dispatcher
就是Dispatchers.Default
继承的CoroutineContext
:父协程的CoroutineContenxt
参数
:后续子协程配置的参数,如上文所示组成部分,新添加的参数会覆盖前面的对应配置。
协程执行任务-Job
用于处理协程,封装了协程需要执行的代码逻辑,并且负责管理协程的生命周期。
通过协程构造器
创建的协程都会返回一个Job实例
。
主要有以下几种生命周期:
New
新建任务
Active
任务活跃
Completing
任务完成中
Cancelling
任务取消中
Cancelled
任务已取消
Completed
任务已完成
Job
内提供了isActive()
、isCancelled()
和isCompleted()
等属性用于判断协程的状态。
协程取消 会更多的分析Job
相关。
协程调度器-CoroutineDispatcher
Dispatchers
是协程中提供的线程调度器
,用来切换线程,指定协程运行的线程。
默认提供了四种调度器
Dispatchers.Default
默认调度器,适合处理后台运算,为CPU密集型
任务调度器
Dispatchers.IO(仅JVM可用)
适合执行IO相关操作,例如读写文件
等,为IO密集型
任务调度器
Dispatchers.Main
UI调度器,根据执行平台的不同会初始化为对应平台的UI线程调度器。
在Android中,就会通过Handler
调度任务到UI线程
执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private fun loadMainDispatcher () : MainCoroutineDispatcher { return try { val factories = if (FAST_SERVICE_LOADER_ENABLED) { FastServiceLoader.loadMainDispatcherFactory() } else { ServiceLoader.load( MainDispatcherFactory::class .java , MainDispatcherFactory::class.java.classLoader ).iterator().asSequence().toList() } @Suppress("ConstantConditionIf" ) factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories) ?: createMissingDispatcher() } catch (e: Throwable) { createMissingDispatcher(e) } } internal fun loadMainDispatcherFactory () : List<MainDispatcherFactory> { val clz = MainDispatcherFactory::class .java if (!ANDROID_DETECTED) { return load(clz, clz.classLoader) } return try { val result = ArrayList<MainDispatcherFactory>(2 ) createInstanceOf(clz, "kotlinx.coroutines.android.AndroidDispatcherFactory" )?.apply { result.add(this ) } createInstanceOf(clz, "kotlinx.coroutines.test.internal.TestMainDispatcherFactory" )?.apply { result.add(this ) } result } catch (e: Throwable) { load(clz, clz.classLoader) } }
按照类名去加载,Android下的名为kotlinx.coroutions.android.AndroidDispatcherFactory
的类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 internal class AndroidDispatcherFactory : MainDispatcherFactory { override fun createDispatcher (allFactories: List <MainDispatcherFactory >) = HandlerContext(Looper.getMainLooper().asHandler(async = true ), "Main" ) }internal class HandlerContext private constructor ( private val handler: Handler, private val name: String?, private val invokeImmediately: Boolean ) : HandlerDispatcher(), Delay { public constructor ( handler: Handler, name: String? = null ) : this (handler, name, false ) override fun isDispatchNeeded (context: CoroutineContext ) : Boolean { return !invokeImmediately || Looper.myLooper() != handler.looper } override fun dispatch (context: CoroutineContext , block: Runnable ) { handler.post(block) } ... }
Dispatchers.Main.immediate
适用于响应一个UI事件后从而启动一个协程时
,会在下一帧中去立即执行任务。
Dispatchers.Unconfined
非限制的调度器,在遇到第一个挂起函数前的代码运行在原线程中,执行挂起函数后,就切换线程运行。
自定义调度器
Default
和IO
的底层实现都依赖于线程池
,执行到挂起函数
时还是会发生线程的切换,可以通过自定义调度器
减少这类切换的发生。
1 val myDispatcher= Executors.newSingleThreadExecutor{ r -> Thread(r, "MyThread" ) }.asCoroutineDispatcher()
协程拦截器-ContinuationInterceptor
ContinuationInterceptor
是一个拦截器的接口定义,用于控制协程的执行流程。
在CoroutineContext
中,实现了ContinuationInterceptor
接口的类,永远会处于最后一位,保证不会被其他类覆盖。
协程拦截器只能存在一个!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public interface ContinuationInterceptor : CoroutineContext.Element { companion object Key : CoroutineContext.Key<ContinuationInterceptor> public fun <T> interceptContinuation (continuation: Continuation <T >) : Continuation<T> public fun releaseInterceptedContinuation (continuation: Continuation <*>) { ... }
其中CoroutineDispatcher
就是基于ContinuationInterceptor
所实现的。
协程异常处理-CoroutineExceptionHandler
所有未被捕获的异常一定会抛出,无论使用哪种Job!!!
当一个协程由于一个异常而运行失败时,它会传播这个异常并传递给他的父级。
主要执行以下几步:
取消它的子级任务
取消自己的任务
把异常继续向上传递到自己的父级
SupervisorJob 使用Job
时,若发生异常会导致异常传递,使得所有的任务都会被取消。
使用SupervisorJob
,一个子协程运行失败不会传播异常,只会影响自身
,其他任务都不会受到影响。
SupervisorJob只有在supervisorScope
或CoroutineScope(SupervisorJob())
内执行才可以生效。
CoroutineScope(SupervisorJob())
1 2 3 4 5 6 7 8 val scope = CoroutineScope(SupervisorJob()) scope.launch{ } scope.launch{ }
supervisorScope
1 2 3 4 5 6 7 8 supervisorScope { launch { throw NullPointerException("123" ) } launch { System.err.println(3 ) } }
使用这两种方式都可以保证异常不向上传播
Job VS SupervisorJob
如果想在出现错误时不会退出父级和其他平级的协程,就要使用SupervisorJob
或supervisorScope
局部异常捕获 根据不同的协程构造器
,处理方式也不尽相同
launch()
主要采用try{}catch{}
的形式进行异常捕获
1 2 3 4 5 6 7 scope.launch { try { codeThatCanThrowExceptions() } catch (e: Exception) { } }
launch()
时,异常会在第一时间被抛出。
async/await()
只有当async()
作为根协程时,不会自动抛出异常,而是要等到await()
执行时才抛出异常。
根协程
:coroutintScope
或supervisorScope
的直接子协程,或者类似scope.async()
这种实现。
这种情况下可以通过try{}catch{}
捕获异常
1 2 3 4 5 6 coroutineScope{ val deferred = async{ throw NullPointerException("123" ) } }
此时不会执行任何
只有在调用.await()
时才会抛出异常,此时就可以添加try{}catch{}
捕获异常。
针对async()
这种情况,最有效的方式就是async()
内部进行try{}catch{}
全局异常捕获
类似Java,协程也提供了捕获全局异常(未声明捕获异常
)的方式
Java的全局异常捕获方式
1 2 3 4 5 6 Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { @Override public void uncaughtException (Thread t, Throwable e) { } });
协程内全局异常捕获方式 主要使用的是CoroutineExceptionHandler
,可以帮助处理一些未捕获的异常
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable -> log("Throws an exception with message: ${throwable.message} " ) }val context = Dispatchers.Main + Job() + exceptionHandlerval scope = CoroutineScope(context)fun main () { scope.launch { launch { throw NullPointerException("1234" ) } delay(1000 ) } }
此时就会捕获到NPE
。
需要CoroutineExceptionHandler
生效需要两个条件:
异常是被自动抛出异常的协程所抛出的。(只能是 launch(),async()这种是不可以的 )
必须在根协程
中,coroutintScope
或supervisorScope
的直接子协程,或者类似scope.async()
这种实现。
真·全局异常捕获 上面说到的CoroutineExceptionHandler
只能在协程内部使用,无法兼顾其他协程的异常情况。此时就需要使用另一种方式,使用ServiceLoader
实现全局内协程异常捕获
实现这个功能需要如下几步:
新建全局CoroutineExceptionHandler
类
在classPath
中注册该类
在src/main/
目录下的,resources/META-INF/services
文件夹
新建kotlinx.coroutines.CoroutineExceptionHandler
文件
文件内写入自定义的全局CoroutineExceptionHandler
完整类名
同样这种配置方式也只对launch()生效。
这里主要应用了SPI机制
全称为Service Provider Interface
,JDK内置的一种服务提供发现机制,主要源码实现在java.util.ServiceLoader
使用过程:
需要在resources/META-INF/services
目录下创建与服务同名的全限定名 相同的文件,然后在文件中写入服务提供者的全限定名 。
原理简介:
主要通过反射调用配置的类进行实例化,反射成功后存入缓存,后续使用直接从缓存重新读取。
总结
协程内部异常处理流程
异常传播在不同作用域的表现
GlobalScope
:异常不会向外传递,因为已经是根协程
coroutineScope
:异常进行双向传递
,父协程和子协程都会被取消
supervisorScope
:异常进行单向传递
,只有父协程向子协程传递异常,子协程会被取消,父协程不受影响
launch/join
和async/await
表现不同
launch/join
关注的是任务是否执行完成 ,async/await
关注的是任务的执行结果 ,所以在局部异常捕获的时候,两种创建方式的异常捕获也会有区别
想要避免异常传播,就要使用SupervisorJob
;不在意就用Job
协程构造器-CoroutineBuilder
主要负责构造一个协程并启动它
常用的有两种方法
launch(重点分析)
默认创建一个新的协程,并返回Job
对象,通过Job
管理协程。
1 2 3 4 5 6 7 8 9 10 11 12 public fun CoroutineScope.launch ( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope .() -> Unit ) : Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true ) coroutine.start(start, coroutine, block) return coroutine }
主要有三个参数:
context
:就是前面介绍的CoroutineContext
start
:协程启动模式
block
:需要执行的任务,由suspend
修饰
newCoroutineContext
将传参的context
与ContextScope
配置的context
进行合并,并返回一个新的context
。
1 2 3 4 5 6 public actual fun CoroutineScope.newCoroutineContext (context: CoroutineContext ) : CoroutineContext { val combined = coroutineContext + context val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null ) debug + Dispatchers.Default else debug }
StandaloneCoroutine/LazyStandaloneCoroutine 1 2 3 4 5 6 7 8 9 private open class StandaloneCoroutine ( parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine<Unit >(parentContext, active) { override fun handleJobException (exception: Throwable ) : Boolean { handleCoroutineException(context, exception) return true } }
继承AbstractCoroutine
且重写了handleJobException()
,这也是为什么CoroutineExceptionHandler
可以监听到异常的原因。
1 2 3 4 5 6 7 8 9 10 11 12 private class LazyStandaloneCoroutine ( parentContext: CoroutineContext, block: suspend CoroutineScope.() -> Unit ) : StandaloneCoroutine(parentContext, active = false ) { private var block: (suspend CoroutineScope.() -> Unit )? = block override fun onStart () { val block = checkNotNull(this .block) { "Already started" } this .block = null block.startCoroutineCancellable(this , this ) } }
LazyStandaloneCoroutine
重写了onStart()
,只有在调用到start()/join()
等方法才会执行。
start()
启动协程任务
1 2 3 4 5 public fun <R> start (start: CoroutineStart , receiver: R , block: suspend R .() -> T ) { initParentJob() start(block, receiver, this ) }
调用到CoroutineStart.invoke()
1 2 3 4 5 6 7 8 public operator fun <R, T> invoke (block: suspend R .() -> T , receiver: R , completion: Continuation <T >) = when (this ) { CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion) CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion) CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion) CoroutineStart.LAZY -> Unit }
默认使用CoroutineStart.DEFAULT
,以这个作为示例分析
1 2 3 4 5 6 7 internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) = runSafely(completion) { createCoroutineUnintercepted(receiver, completion) .intercepted() .resumeCancellable(Unit ) }
主要流程分为三步:
createCoroutineUninterecpted 1 2 3 4 5 6 7 8 9 10 11 12 13 @SinceKotlin("1.3" ) public actual fun <T> (suspend () -> T).createCoroutineUnintercepted( completion: Continuation<T> ): Continuation<Unit > { val probeCompletion = probeCoroutineCreated(completion) return if (this is BaseContinuationImpl) create(probeCompletion) else createCoroutineFromSuspendFunction(probeCompletion) { (this as Function1<Continuation<T>, Any?>).invoke(it) } }
主要是为了创建Continuation
对象
intercepted 1 2 3 4 5 6 7 8 9 public actual fun <T> Continuation<T> .intercepted () : Continuation<T> = (this as ? ContinuationImpl)?.intercepted() ?: this public fun intercepted () : Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this ) ?: this ) .also { intercepted = it }
如果设置了ContinutionInterceptor
,就获取并执行interceptContinuation()
。
resumeCancellable 1 2 3 4 internal fun <T> Continuation<T> .resumeCancellable (value: T ) = when (this ) { is DispatchedContinuation -> resumeCancellable(value) else -> resume(value) }
进行线程调度或者事件拦截处理,然后协程就开始启动了。
async 协程启动模式-CoroutineStart
控制协程创建后的调用规则
CoroutineStart.DEFAULT
协程的默认启动模式,为饿汉式调用
。在调用协程后,会立即进入调度状态。
可以在调度前被取消。
CoroutineStart.LAZY
懒汉式调用
,只有需要执行时才会执行。
通过调用以下方法就可以进入调度状态。
job.start()
:启动协程
job.join
:启动协程并等待任务执行结束
job.await()
CoroutineStart.ATOMIC
协程创建后,立即开始调度。
在执行到第一个挂起点之前不会响应cancel()
CoroutineStart.UNDISPATCHED
协程创建后,立即开始调度
直到遇到第一个挂起点之前,都会在当前线程中执行。
协程取消-Cancel
取消协程可以针对CoroutineScope
或Job
去执行。
取消作用域下所有协程
调用CoroutineScope.cancel()
1 2 3 4 5 6 val scope = CoroutineScope(context) ...if (scope.isActive){ scope.cancel() }
适用于页面关闭时,需要回收资源的情况
不能在已取消的作用域中再次启动新的协程。
取消单个协程
针对Job
进行取消,调用cancel()
可以取消正在运行的协程
1 2 3 4 5 6 7 8 9 10 11 12 val job = launch { repeat(1000 ) { i -> println("I'm sleeping $i ..." ) delay(500L ) } } delay(1300L ) println("main: I'm tired of waiting!" ) job.cancel() job.join() println("main: Now I can quit." )
协程之间的关系
协程是存在着父子关系的,取消父协程时,也会取消所有子协程
主要有以下三种关系:
父协程
调用cancel()
或触发异常时,会立即取消所有子协程
;子协程
调用cancel()
不影响父协程及兄弟协程的执行
在底层实现中,子协程通过抛出异常的方式将取消的情况通知到父协程。
父协程通过传入的异常来决定是否处理异常,如果异常为CancellationException
就不做处理。
父协程
必须等到所有子协程
完成才算完成
子协程
抛出未捕获的异常时,默认情况下会取消父协程
(superVisorJob
和CancellationException
除外)
使协程可以取消
协程处理任务的代码必须是协作式 的,需要配合协程取消
进行了处理。
需要在任务处理期间定期检查协程是否已被取消
,或者在处理耗时任务之前就检查当前协程是否已取消
。
目前只有kotlinx.coroutines
所有的挂起函数都是可取消的
,例如delay()
、yield()
等,这些都不需要去检查协程是否已取消。
因此要使协程可以被取消
,可以使用以下两种方法:
通过job.isActive
或ensureActive()
检查协程状态
内部使用delay()
或yield()
等挂起函数——核心在于suspendCoroutineUninterceptedOrReturn
检查Job的活跃状态-isActive
在协程执行过程中,添加isActive
检查协程状态,若!isActive
就不向下执行任务。
1 2 3 4 5 6 7 8 9 val job = scope.launch{ var i = 0 while (i < 5 && isActive){ Log.e("test" ,"now value = ${i++} " ) } } ... job.cancel()
还有一种方式就是ensureActive()
1 2 3 public fun Job.ensureActive () : Unit { if (!isActive) throw getCancellationException() }
使用ensureActive()
可以不用手动去检测isActive
,通过直接抛出异常来结束任务。
使用挂起函数
挂起函数
:delay()
、yield()
等函数,内部核心实现为suspendCancellableCoroutine
delay()
让协程挂起,而且不会阻塞CPU。类似于Thread.sleep()
1 2 3 4 5 6 public suspend fun delay (timeMillis: Long ) { if (timeMillis <= 0 ) return return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit > -> cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont) } }
yield()
挂起当前协程,然后将协程分发到Dispatcher
队列,可以让该协程所在线程或线程池可以运行其他协程逻辑,然后等待Dispatcher
空闲的时候继续执行原来的协程任务。类似于Thread.yield()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public suspend fun yield () : Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont -> val context = uCont.context context.checkCompletion() val cont = uCont.intercepted() as ? DispatchedContinuation<Unit > ?: return @sc Unit if (!cont.dispatcher.isDispatchNeeded(context)) { return @sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit } cont.dispatchYield(Unit ) COROUTINE_SUSPENDED }internal fun CoroutineContext.checkCompletion () { val job = get (Job) if (job != null && !job.isActive) throw job.getCancellationException() }
执行yield()
时,会优先检测任务的完成状态,如果!job.isActive
直接抛出CancellableException
suspendCoroutineUninterceptedOrReturn
主要作用为获取当前协程的实例,并且挂起当前协程或者不挂起直接返回结果
。
根据上述源码发现,挂起函数
的关键在于suspendCoroutineUninterceptedOrReturn
,只要使用了该方法,就可以成为挂起函数
。
通过做转换的时候,可以使用系统提供的两个转换函数:
suspendCoroutine
suspendCancellableCoroutine
推荐使用
相关源码 suspendCoroutine 1 2 3 4 5 6 7 suspend fun test () = suspendCoroutine<String> { continuation -> if (...) { continuation.resume("11" ) } else { continuation.resumeWithException(NullPointerException("123" )) } }
suspendCancellableCoroutine 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 val aa = 0 suspend fun ttt () = suspendCancellableCoroutine<Int > { cancellableContinuation -> if (aa == 0 ) { cancellableContinuation.resume(1 ) { log("aaa ${it.message} " ) } } else { cancellableContinuation.resumeWithException(IllegalArgumentException("123" )) } cancellableContinuation.invokeOnCancellation { log("我被取消了" ) } }
可以通过continuation.invokeCancellation()
执行取消操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public suspend inline fun <T> suspendCancellableCoroutine ( crossinline block: (CancellableContinuation <T >) -> Unit ) : T = suspendCoroutineUninterceptedOrReturn { uCont -> val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE) cancellable.initCancellability() block(cancellable) cancellable.getResult() }
禁止取消
当任务被取消时,挂起函数会收到CancellationException
后续如果需要执行一些其他的挂起函数任务将无法执行。
对挂起函数调用withContext(NonCancellable)
,保证挂起函数正常执行。
关键在于isActive
永远为true
超时取消
大部分取消协程的原因都是超出了预期的执行时间 ,此时就会去触发取消的操作。
对挂起函数调用withTimeout(XX)
或withTimeoutOrNull(XX)
,唯一的区别就是后者会返回null
而不是抛出异常。
原理实现 Dispatchers原理 无论是Dispatchers.Default
或者IO
都是CoroutineDispatcher
的子类。
1 2 3 4 5 6 7 8 public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { //线程调度,指定协程在某一线程上运行 public abstract fun dispatch(context: CoroutineContext, block: Runnable) //封装 Continuation 为 DispatchedContinuation public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation) }
CoroutineDispatacher
继承AbstractCoroutineContextElement
类,还实现了ContinuationInterceptor
接口。
DispatchedContinuation
代理协程体Continuation对象并持有线程调度器,负责使用线程调度器将协程体调度到执行的线程执行 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 internal class DispatchedContinuation <in T > ( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T> ) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation { override fun resumeWith (result: Result <T >) { val context = continuation.context val state = result.toState() if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_ATOMIC_DEFAULT dispatcher.dispatch(context, this ) } else { executeUnconfined(state, MODE_ATOMIC_DEFAULT) { withCoroutineContext(this .context, countOrElement) { continuation.resumeWith(result) } } } } inline fun resumeCancellableWith (result: Result <T >) { val state = result.toState() if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE dispatcher.dispatch(context, this ) } else { executeUnconfined(state, MODE_CANCELLABLE) { if (!resumeCancelled()) { resumeUndispatchedWith(result) } } } } }
DispatchedContinuation
用两个参数构建
dispatcher
:拦截器
continuation
:协程体类对象
其中resumeWith()
和resumeCancellableWith()
负责协程的启动。
//TODO 先挂着
协程启动流程
通过CoroutineScope.launch()
创建一个协程,默认启动模式为ControutineStart.DEFAULT
,创建一个StandaloneCoroutine
协程对象
执行StandaloneCoroutine.start()
实质执行到AbstractCoroutine.start()
,继续触发到CoroutineStart.invoke()
由于默认调度器为Dispatchers.Default
,所以执行到了startCoroutineCancellable()
startCoroutineCancellable()
内部主要有三次调用
createCoroutineUnintercepted()
:创建一个协程体类对象
intercepted
:将协程体类包装成DispatchedContinuation
对象
resumeCancellableWith()
:通过Default
调用到resumeCancellableWith()
实际调用到了DispatchContinuation.resumeCancellableWith()
,最后执行到Continuation.resumeWith()
执行协程任务。
协程挂起/恢复原理
挂起的特点:不阻塞线程 。挂起的本质切线程 ,并且在相应逻辑处理完毕之后,再重新切回线程。
1 2 3 4 5 6 7 8 9 suspend fun loginUser (userId: String , password: String ) : String { val user = logUserIn(userId, password) val userDb = logUserIn(user) return userDb }suspend fun logUserIn (userId: String , password: String ) : Stringsuspend fun logUserIn (userId: String ) : String
反编译后得到
1 2 3 4 5 6 fun loginUser (userId: String , password: String , completion: Continuation <Any ?>) { val user = logUserIn(userId, password) val userDb = logUserIn(user) completion.resume(userDb) }
调用挂起函数
或者suspend lambda表达式
时,都会一个隐式参数
传入,这个参数是Continuation
类型。
CPS:续体传递风格
在每个挂起函数
与suspend lambda表达式
都会附加一个Continuation
参数,并且是用来代替suspend
Continuation接口 挂起函数
通过Continuation
在方法间互相通信,基本实现如下:
1 2 3 4 5 6 7 8 9 10 interface Continuation <in T > { public val context: CoroutineContext public fun resumeWith (value: Result <T >) }public inline fun <T> Continuation<T> .resume (value: T ) : Unit = resumeWith(Result.success(value))public inline fun <T> Continuation<T> .resumeWithException (exception: Throwable ) : Unit = resumeWith(Result.failure(exception))
后续添加resume(value)
和resumeWithException(exception)
可以方便的获取结果,而不需要从Result
解析。
Continuation
主要有以下参数和方法
context
:内部使用的CoroutineContext
resumeWith()
:恢复协程的执行,同时传入一个Result
。内部包括了计算结果
或过程中发生的异常
状态机
Kotlin编译器会确定函数何时可以在内部挂起,每个挂起点都会被声明为有限状态机的一个状态,每个状态用label
表示
查看反编译后源码,内部源码大概如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 @Nullable public final Object loginUser (@NotNull String userId, @NotNull String password, @NotNull Continuation $completion) { Object $continuation; label27: { if ($completion instanceof <undefinedtype>) { $continuation = (<undefinedtype>)$completion; if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0 ) { ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE; break label27; } } $continuation = new ContinuationImpl($completion) { Object result; int label; Object L$0 ; Object L$1 ; Object L$2 ; Object L$3 ; @Nullable public final Object invokeSuspend (@NotNull Object $result) { this .result = $result; this .label |= Integer.MIN_VALUE; return MyClass.this .loginUser((String)null , (String)null , this ); } }; } Object var10000; label22: { Object $result = ((<undefinedtype>)$continuation).result; Object var8 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); String user; switch (((<undefinedtype>)$continuation).label) { case 0 : ResultKt.throwOnFailure($result); ((<undefinedtype>)$continuation).L$0 = this ; ((<undefinedtype>)$continuation).L$1 = userId; ((<undefinedtype>)$continuation).L$2 = password; ((<undefinedtype>)$continuation).label = 1 ; var10000 = this .logUserIn(userId, password, (Continuation)$continuation); if (var10000 == var8) { return var8; } break ; case 1 : password = (String)((<undefinedtype>)$continuation).L$2 ; userId = (String)((<undefinedtype>)$continuation).L$1 ; this = (MyClass)((<undefinedtype>)$continuation).L$0 ; ResultKt.throwOnFailure($result); user = (String)var10000; ((<undefinedtype>)$continuation).L$0 = this ; ((<undefinedtype>)$continuation).L$1 = userId; ((<undefinedtype>)$continuation).L$2 = password; ((<undefinedtype>)$continuation).L$3 = user; ((<undefinedtype>)$continuation).label = 2 ; var10000 = this .logUserIn(user, (Continuation)$continuation); if (var10000 == var8) { return var8; } var10000 = $result; break ; case 2 : user = (String)((<undefinedtype>)$continuation).L$3 ; password = (String)((<undefinedtype>)$continuation).L$2 ; userId = (String)((<undefinedtype>)$continuation).L$1 ; MyClass var9 = (MyClass)((<undefinedtype>)$continuation).L$0 ; ResultKt.throwOnFailure($result); var10000 = $result; break label22; default : throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine" ); } } String userDb = (String)var10000; return userDb; }
Kotlin编译器将每个挂起函数
转换为一个状态机,在每次函数需要挂起时使用回调并进行优化。
观察上述源码发现主要有几个关键点
ContinuationImpl 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 internal abstract class ContinuationImpl ( completion: Continuation<Any?>?, private val _context: CoroutineContext? ) : BaseContinuationImpl(completion) { constructor (completion: Continuation<Any?>?) : this (completion, completion?.context) public override val context: CoroutineContext get () = _context!! @Transient private var intercepted: Continuation<Any?>? = null public fun intercepted () : Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this ) ?: this ) .also { intercepted = it } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 internal abstract class BaseContinuationImpl ( public val completion: Continuation<Any?>? ) : Continuation<Any?>, CoroutineStackFrame, Serializable { public final override fun resumeWith (result: Result <Any ?>) { var current = this var param = result while (true ) { probeCoroutineResumed(current) with(current) { val completion = completion!! val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() if (completion is BaseContinuationImpl) { current = completion param = outcome } else { completion.resumeWith(outcome) return } } } } protected abstract fun invokeSuspend (result: Result <Any ?>) : Any? ... }
invokeSuspend()
执行的就是协程体
,当invokeSuspend()
返回值为COROUTINE_SUSPENDED
时,执行return操作
,协程体的操作也会被结束,所以COROUTINE_SUSPENDED
也表示协程发生挂起 。
协程挂起
通过挂起函数将协程挂起,此处拿withContext()
进行分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public suspend fun <T> withContext ( context: CoroutineContext , block: suspend CoroutineScope .() -> T ) : T = suspendCoroutineUninterceptedOrReturn sc@ { uCont -> val oldContext = uCont.context val newContext = oldContext + context newContext.checkCompletion() if (newContext === oldContext) { val coroutine = ScopeCoroutine(newContext, uCont) return @sc coroutine.startUndispatchedOrReturn(coroutine, block) } if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) { val coroutine = UndispatchedCoroutine(newContext, uCont) withCoroutineContext(newContext, null ) { return @sc coroutine.startUndispatchedOrReturn(coroutine, block) } } val coroutine = DispatchedCoroutine(newContext, uCont) coroutine.initParentJob() block.startCoroutineCancellable(coroutine, coroutine) coroutine.getResult() }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 fun getResult () : Any? { if (trySuspend()) return COROUTINE_SUSPENDED val state = this .state.unboxState() if (state is CompletedExceptionally) throw state.cause @Suppress("UNCHECKED_CAST" ) return state as T } private fun trySuspend () : Boolean { _decision.loop { decision -> when (decision) { UNDECIDED -> if (this ._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true RESUMED -> return false else -> error("Already suspended" ) } } }
协程是否挂起,关键在于是否返回COROUTINE_SUSPENDED ,在getResult()
中就是判断trySuspend()
是否返回true
。
协程恢复 在withContext()
中调用startCoroutine()
传入了两个参数,其中第二个表示协程完成的回调
。
当协程完成的时候会调用resumeWith()
,然后层层传递到JobSupport.afterCompletion()
,最后执行到DispatchedCoroutine
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 override fun afterCompletionInternal (state: Any ?, mode: Int ) { if (tryResume()) return super .afterCompletionInternal(state, mode) }private fun tryResume () : Boolean { _decision.loop { decision -> when (decision) { UNDECIDED -> if (this ._decision.compareAndSet(UNDECIDED, RESUMED)) return true SUSPENDED -> return false else -> error("Already resumed" ) } } }
在afterCompletionInternal()
判断协程是否被挂起,若挂起则恢复已被挂起的协程。
然后再回到执行线程上,就会继续执行invokeSuspend()
直到执行结束。
协程并发 参考链接 Kotlin/Keep
Android_开发者
Kotlin协程原理解析
图解协程:suspend