Kotlin协程原理

在介绍了大部分概念,现在需要针对这些概念进行详细的说明以及分析。

协程是轻量级的线程

协程概念

非抢占式或协作式的计算机并发调度的实现,程序可以主动挂起或者恢复执行,

避免在异步程序中使用大量的回调,使用阻塞的方式写出非阻塞的代码。

一种全新处理并发的方式,可以在Android平台上简化异步执行的代码。

协程主要用来解决两个问题:

  • 处理耗时任务
  • 保证主线程安全

在后面的原理介绍中,会介绍与这两个问题相关的概念。

概念介绍

在执行过程中会涉及的一些概念,在后续源码分析也会涉及。

协程体

协程中要执行的操作,是一个被suspend修饰的lambda表达式

挂起函数

suspend修饰的函数,只能在挂起函数或者协程体中调用。可以通过调用其他挂起函数挂起执行代码,而不阻塞当前执行线程。

挂起点

一般对应挂起函数被调用的位置

续体-Continuation

挂起的协程在挂起点时的状态。概念上表示挂起点之后的剩余应该执行的代码

核心类

协程作用域—CoroutineScope

追踪每一个通过launchasync创建的协程。并且任何时候都可以通过scope.cancel()取消正在执行的协程。

可以通过CoroutineScope控制协程的生命周期,当Activity/Fragment关闭时,调用cancel()及时关闭。

协程作用域主要有三种:

阻塞协程作用域

调用runBlocking()的线程会被阻塞直到内部协程任务执行完毕。

全局协程作用域

GlobalScope作用于整个应用的生命周期,并且无法被取消,在界面上使用时,就会导致内存泄漏。

1
2
3
4
5
6
7
8
9
10

public object GlobalScope : CoroutineScope {
// 重写coroutineContext,返回一个空的协程上下文
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
public interface CoroutineScope {
// 协程上下文
public val coroutineContext: CoroutineContext
}

自定义作用域

自定义协程作用域,可以针对性的控制避免内存泄漏。

1
2
val coroutineContext : CoroutineContext = Dispatchers.Main + Job()//协程上下文
val coroutineScope = CoroutineScope(coroutineContext)//自定义作用域

需要自定义协程作用域时,需要构造一个CoroutineContext作为参数。

CoroutineContext会在下面重点介绍。

内置的自定义作用域
MainScope

为了方便开发使用,Kotlin标准库提供了MainScope用于快速生成CoroutineScope

1
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
viewModelScope

在AndroidX中引入的viewMdoelScope,在ViewModel销毁时会自动取消协程任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val ViewModel.viewModelScope: CoroutineScope
get() {
val scope: CoroutineScope? = this.getTag(JOB_KEY)
//缓存中读取 对应scope
if (scope != null) {
return scope
}
//对应了ViewModel内部的实现代码
return setTagIfAbsent(JOB_KEY,
CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate))
}
//自动取消 coroutineScope
internal class CloseableCoroutineScope(context: CoroutineContext) : Closeable, CoroutineScope {
override val coroutineContext: CoroutineContext = context

override fun close() {
coroutineContext.cancel()
}
}

协程上下文-CoroutineContext

一组定义协程行为的元素,本体是一个数据结构,类似于Map,内部实现为单链表

由如下几项构成:

  • Job:执行的任务
  • CoroutineDispatcher:协程调度器
  • CoroutineName:协程的名称,主要用于调试
  • CoroutineExceptionHandler:处理未被捕获的异常。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface CoroutineContext {
public operator fun <E : Element> get(key: Key<E>): E?

public fun <R> fold(initial: R, operation: (R, Element) -> R): R

public operator fun plus(context: CoroutineContext): CoroutineContext =
//空实现直接返回
if (context === EmptyCoroutineContext) this else
// 遍历context集合
context.fold(this) { acc, element -> //acc 当前上下文集合 element context集合的元素
val removed = acc.minusKey(element.key) //移除对应集合的元素
if (removed === EmptyCoroutineContext) element else {
val interceptor = removed[ContinuationInterceptor] //获取拦截器
if (interceptor == null) CombinedContext(removed, element) //生成最后的CombinedContext节点
else {
//拦截器永远位于 链表尾部
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}

public fun minusKey(key: Key<*>): CoroutineContext

}

自定义CoroutineContext

1
val coroutineContext : CoroutineContext = Dispatchers.Main + Job() + CoroutineName("name")//协程上下文

CoroutineContext通过+进行元素的合并,+右侧的元素会覆盖左侧的元素。

CoroutineContext存储方式为左向链表,链表的每一个节点都是CombinedContext,当存在协程拦截器时,永远处于链表的最后。

经过上述的plus操作后,最后得到一个完整的CoroutineContext对象。

CoroutineContext的父子关系

每个协程都会有一个父对象,协程的父级CoroutineContext和父协程的CoroutineContext是不一致的。

父级上下文 = 默认值 + 继承的CoroutineContext+参数

默认值:一些元素包含的默认值,例如默认Dispatcher就是Dispatchers.Default

继承的CoroutineContext:父协程的CoroutineContenxt

参数:后续子协程配置的参数,如上文所示组成部分,新添加的参数会覆盖前面的对应配置。

协程执行任务-Job

用于处理协程,封装了协程需要执行的代码逻辑,并且负责管理协程的生命周期。

通过协程构造器创建的协程都会返回一个Job实例

主要有以下几种生命周期:

  • New 新建任务
  • Active 任务活跃
  • Completing 任务完成中
  • Cancelling 任务取消中
  • Cancelled 任务已取消
  • Completed 任务已完成

Job生命周期

Job内提供了isActive()isCancelled()isCompleted()等属性用于判断协程的状态。

协程取消会更多的分析Job相关。

协程调度器-CoroutineDispatcher

Dispatchers是协程中提供的线程调度器,用来切换线程,指定协程运行的线程。

默认提供了四种调度器

Dispatchers.Default

默认调度器,适合处理后台运算,为CPU密集型任务调度器

Dispatchers.IO(仅JVM可用)

适合执行IO相关操作,例如读写文件等,为IO密集型任务调度器

Dispatchers.Main

UI调度器,根据执行平台的不同会初始化为对应平台的UI线程调度器。

在Android中,就会通过Handler调度任务到UI线程执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//加载各平台下定义的MainDispatcherFactory
private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
val factories = if (FAST_SERVICE_LOADER_ENABLED) {
FastServiceLoader.loadMainDispatcherFactory()
} else {
// We are explicitly using the
// `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
// form of the ServiceLoader call to enable R8 optimization when compiled on Android.
ServiceLoader.load(
MainDispatcherFactory::class.java,
MainDispatcherFactory::class.java.classLoader

).iterator().asSequence().toList()
}
@Suppress("ConstantConditionIf")
factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
} catch (e: Throwable) {
// Service loader can throw an exception as well
createMissingDispatcher(e)
}
}

internal fun loadMainDispatcherFactory(): List<MainDispatcherFactory> {
val clz = MainDispatcherFactory::class.java
if (!ANDROID_DETECTED) {
return load(clz, clz.classLoader)
}

return try {
val result = ArrayList<MainDispatcherFactory>(2)
//加载对应类名的类
createInstanceOf(clz, "kotlinx.coroutines.android.AndroidDispatcherFactory")?.apply { result.add(this) }
createInstanceOf(clz, "kotlinx.coroutines.test.internal.TestMainDispatcherFactory")?.apply { result.add(this) }
result
} catch (e: Throwable) {
// Fallback to the regular SL in case of any unexpected exception
load(clz, clz.classLoader)
}
}

按照类名去加载,Android下的名为kotlinx.coroutions.android.AndroidDispatcherFactory的类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//在Android编译完成后,可以读取到该类
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) = HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
}
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)

//android中需要向主looper进行提交调度
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}

//通过持有主线程looper的handler进行调度
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
...
}
Dispatchers.Main.immediate

适用于响应一个UI事件后从而启动一个协程时,会在下一帧中去立即执行任务。

Dispatchers.Unconfined

非限制的调度器,在遇到第一个挂起函数前的代码运行在原线程中,执行挂起函数后,就切换线程运行。

调度器间的差异

自定义调度器

DefaultIO的底层实现都依赖于线程池,执行到挂起函数时还是会发生线程的切换,可以通过自定义调度器减少这类切换的发生。

1
val myDispatcher= Executors.newSingleThreadExecutor{ r -> Thread(r, "MyThread") }.asCoroutineDispatcher() //转换线程池到 Dispatcher

协程拦截器-ContinuationInterceptor

ContinuationInterceptor是一个拦截器的接口定义,用于控制协程的执行流程。

CoroutineContext中,实现了ContinuationInterceptor接口的类,永远会处于最后一位,保证不会被其他类覆盖。

协程拦截器只能存在一个!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ContinuationInterceptor.kt
public interface ContinuationInterceptor : CoroutineContext.Element {
// 实现CoroutineContext.Element接口,说明自身是CoroutineContext上下文集合的一个元素类型
// 定义伴生对象Key作为集合中的索引key,可直接通过类名访问该伴生对象
companion object Key : CoroutineContext.Key<ContinuationInterceptor>

// 传入一个Continuation对象,并返回一个新的Continuation对象
// 在协程中,这里的传参continuation就是协程体编译后Continuation对象
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>

public fun releaseInterceptedContinuation(continuation: Continuation<*>) {

...
}

其中CoroutineDispatcher就是基于ContinuationInterceptor所实现的。

协程异常处理-CoroutineExceptionHandler

所有未被捕获的异常一定会抛出,无论使用哪种Job!!!

当一个协程由于一个异常而运行失败时,它会传播这个异常并传递给他的父级。

△ 协程中的异常会通过协程的层级不断传播

主要执行以下几步:

  1. 取消它的子级任务
  2. 取消自己的任务
  3. 把异常继续向上传递到自己的父级

SupervisorJob

使用Job时,若发生异常会导致异常传递,使得所有的任务都会被取消。

使用SupervisorJob,一个子协程运行失败不会传播异常,只会影响自身,其他任务都不会受到影响。

SupervisorJob只有在supervisorScopeCoroutineScope(SupervisorJob())内执行才可以生效。

SupervisorJob 不会取消它其他的子级

CoroutineScope(SupervisorJob())

1
2
3
4
5
6
7
8
val scope = CoroutineScope(SupervisorJob())
scope.launch{
//child 1
}
scope.launch{
//child 2
}
//若child1 发生异常不会影响 child2

supervisorScope

1
2
3
4
5
6
7
8
supervisorScope {
launch {
throw NullPointerException("123")
}
launch {
System.err.println(3)
}
}

使用这两种方式都可以保证异常不向上传播

Job VS SupervisorJob

如果想在出现错误时不会退出父级和其他平级的协程,就要使用SupervisorJobsupervisorScope

局部异常捕获

根据不同的协程构造器,处理方式也不尽相同

launch()

主要采用try{}catch{}的形式进行异常捕获

1
2
3
4
5
6
7
scope.launch {
try {
codeThatCanThrowExceptions()
} catch(e: Exception) {
// 处理异常
}
}

launch()时,异常会在第一时间被抛出。

async/await()

只有当async()作为根协程时,不会自动抛出异常,而是要等到await()执行时才抛出异常。

根协程coroutintScopesupervisorScope的直接子协程,或者类似scope.async()这种实现。

这种情况下可以通过try{}catch{}捕获异常

1
2
3
4
5
6
//supervisorScope
coroutineScope{
val deferred = async{
throw NullPointerException("123")
}
}

此时不会执行任何

只有在调用.await()时才会抛出异常,此时就可以添加try{}catch{}捕获异常。

针对async()这种情况,最有效的方式就是async()内部进行try{}catch{}

全局异常捕获

类似Java,协程也提供了捕获全局异常(未声明捕获异常)的方式

Java的全局异常捕获方式

1
2
3
4
5
6
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
//TODO 异常处理
}
});
协程内全局异常捕获方式

主要使用的是CoroutineExceptionHandler,可以帮助处理一些未捕获的异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
log("Throws an exception with message: ${throwable.message}")
}

val context = Dispatchers.Main + Job() + exceptionHandler
val scope = CoroutineScope(context)

fun main(){
scope.launch {
launch {
throw NullPointerException("1234")
}
delay(1000)
}
}

此时就会捕获到NPE

需要CoroutineExceptionHandler生效需要两个条件:

  1. 异常是被自动抛出异常的协程所抛出的。(只能是 launch(),async()这种是不可以的)
  2. 必须在根协程中,coroutintScopesupervisorScope的直接子协程,或者类似scope.async()这种实现。
真·全局异常捕获

上面说到的CoroutineExceptionHandler只能在协程内部使用,无法兼顾其他协程的异常情况。此时就需要使用另一种方式,使用ServiceLoader实现全局内协程异常捕获

实现这个功能需要如下几步:

  1. 新建全局CoroutineExceptionHandler
  2. classPath中注册该类
    • src/main/目录下的,resources/META-INF/services文件夹
    • 新建kotlinx.coroutines.CoroutineExceptionHandler文件
    • 文件内写入自定义的全局CoroutineExceptionHandler完整类名

同样这种配置方式也只对launch()生效。

这里主要应用了SPI机制

全称为Service Provider Interface,JDK内置的一种服务提供发现机制,主要源码实现在java.util.ServiceLoader

使用过程:

需要在resources/META-INF/services目录下创建与服务同名的全限定名相同的文件,然后在文件中写入服务提供者的全限定名

原理简介:

主要通过反射调用配置的类进行实例化,反射成功后存入缓存,后续使用直接从缓存重新读取。

总结
  1. 协程内部异常处理流程

    • 在作用域内使用try..catch可以直接捕获子线程中的异常。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      try{
      launch{

      }

      async{

      }
      }catch{...}{

      }
  • 如果未设置异常捕获,则会走全局异常捕获流程(只在launch创建协程下生效)

    • 若设置CoroutineExceptionHandler则处理。必须在根协程下才可以生效
    • 没配置,向GlobalExceptionHandler进行处理,该配置是全局的,对所有协程任务生效
  1. 异常传播在不同作用域的表现

    • GlobalScope:异常不会向外传递,因为已经是根协程
    • coroutineScope:异常进行双向传递,父协程和子协程都会被取消
    • supervisorScope:异常进行单向传递,只有父协程向子协程传递异常,子协程会被取消,父协程不受影响
  2. launch/joinasync/await表现不同

    launch/join关注的是任务是否执行完成async/await关注的是任务的执行结果,所以在局部异常捕获的时候,两种创建方式的异常捕获也会有区别

  3. 想要避免异常传播,就要使用SupervisorJob;不在意就用Job

协程构造器-CoroutineBuilder

主要负责构造一个协程并启动它

常用的有两种方法

launch(重点分析)

默认创建一个新的协程,并返回Job对象,通过Job管理协程。

1
2
3
4
5
6
7
8
9
10
11
12
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
)
: Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

主要有三个参数:

  • context:就是前面介绍的CoroutineContext
  • start协程启动模式
  • block:需要执行的任务,由suspend修饰
newCoroutineContext

将传参的contextContextScope配置的context进行合并,并返回一个新的context

1
2
3
4
5
6
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
StandaloneCoroutine/LazyStandaloneCoroutine
1
2
3
4
5
6
7
8
9
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}

继承AbstractCoroutine且重写了handleJobException(),这也是为什么CoroutineExceptionHandler可以监听到异常的原因。

1
2
3
4
5
6
7
8
9
10
11
12
private class LazyStandaloneCoroutine(
parentContext: CoroutineContext,
block: suspend CoroutineScope.() -> Unit
) : StandaloneCoroutine(parentContext, active = false) {
private var block: (suspend CoroutineScope.() -> Unit)? = block

override fun onStart() {
val block = checkNotNull(this.block) { "Already started" }
this.block = null
block.startCoroutineCancellable(this, this)
}
}

LazyStandaloneCoroutine重写了onStart(),只有在调用到start()/join()等方法才会执行。

start()

启动协程任务

1
2
3
4
5
//AbstractCoroutine.kt
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}

调用到CoroutineStart.invoke()

1
2
3
4
5
6
7
8
//CoroutineStart.kt
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
when (this) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}

默认使用CoroutineStart.DEFAULT,以这个作为示例分析

1
2
3
4
5
6
7
//Cancellable.kt
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion)
.intercepted()
.resumeCancellable(Unit)
}

主要流程分为三步:

createCoroutineUninterecpted
1
2
3
4
5
6
7
8
9
10
11
12
13
//IntrinsicsJvm.kt
@SinceKotlin("1.3")
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}

主要是为了创建Continuation对象

intercepted
1
2
3
4
5
6
7
8
9
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this


//ContinuationImpl.kt
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }

如果设置了ContinutionInterceptor,就获取并执行interceptContinuation()

resumeCancellable
1
2
3
4
internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
is DispatchedContinuation -> resumeCancellable(value)
else -> resume(value)
}

进行线程调度或者事件拦截处理,然后协程就开始启动了。

async

协程启动模式-CoroutineStart

控制协程创建后的调用规则

CoroutineStart.DEFAULT

协程的默认启动模式,为饿汉式调用。在调用协程后,会立即进入调度状态。

可以在调度前被取消。

CoroutineStart.LAZY

懒汉式调用,只有需要执行时才会执行。

通过调用以下方法就可以进入调度状态。

  • job.start():启动协程
  • job.join:启动协程并等待任务执行结束
  • job.await()

CoroutineStart.ATOMIC

协程创建后,立即开始调度。

在执行到第一个挂起点之前不会响应cancel()

CoroutineStart.UNDISPATCHED

协程创建后,立即开始调度

直到遇到第一个挂起点之前,都会在当前线程中执行。

协程取消-Cancel

取消协程可以针对CoroutineScopeJob去执行。

取消作用域下所有协程

调用CoroutineScope.cancel()

1
2
3
4
5
6
val scope = CoroutineScope(context)

...
if(scope.isActive){ //判断当前scope是否活跃
scope.cancel()
}

适用于页面关闭时,需要回收资源的情况

不能在已取消的作用域中再次启动新的协程。

取消单个协程

针对Job进行取消,调用cancel()可以取消正在运行的协程

1
2
3
4
5
6
7
8
9
10
11
12
//官方示例代码
val job = launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // 等待一段时间
println("main: I'm tired of waiting!")
job.cancel() // 取消 job
job.join() // 等待 job 结束
println("main: Now I can quit.")
协程之间的关系

协程是存在着父子关系的,取消父协程时,也会取消所有子协程

主要有以下三种关系:

  1. 父协程调用cancel()或触发异常时,会立即取消所有子协程子协程调用cancel()不影响父协程及兄弟协程的执行

    在底层实现中,子协程通过抛出异常的方式将取消的情况通知到父协程。

    父协程通过传入的异常来决定是否处理异常,如果异常为CancellationException就不做处理。

  2. 父协程必须等到所有子协程完成才算完成

  3. 子协程抛出未捕获的异常时,默认情况下会取消父协程(superVisorJobCancellationException除外)

使协程可以取消

协程处理任务的代码必须是协作式的,需要配合协程取消进行了处理。

需要在任务处理期间定期检查协程是否已被取消,或者在处理耗时任务之前就检查当前协程是否已取消

目前只有kotlinx.coroutines所有的挂起函数都是可取消的,例如delay()、yield()等,这些都不需要去检查协程是否已取消。

因此要使协程可以被取消,可以使用以下两种方法:

  • 通过job.isActiveensureActive()检查协程状态
  • 内部使用delay()yield()等挂起函数——核心在于suspendCoroutineUninterceptedOrReturn
检查Job的活跃状态-isActive

在协程执行过程中,添加isActive检查协程状态,若!isActive就不向下执行任务。

1
2
3
4
5
6
7
8
9
val job = scope.launch{
var i = 0
while(i < 5 && isActive){
Log.e("test","now value = ${i++}")
}
}

...
job.cancel()

还有一种方式就是ensureActive()

1
2
3
public fun Job.ensureActive(): Unit {
if (!isActive) throw getCancellationException()
}

使用ensureActive()可以不用手动去检测isActive,通过直接抛出异常来结束任务。

使用挂起函数

挂起函数delay()yield()等函数,内部核心实现为suspendCancellableCoroutine

delay()

让协程挂起,而且不会阻塞CPU。类似于Thread.sleep()

1
2
3
4
5
6
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
yield()

挂起当前协程,然后将协程分发到Dispatcher队列,可以让该协程所在线程或线程池可以运行其他协程逻辑,然后等待Dispatcher空闲的时候继续执行原来的协程任务。类似于Thread.yield()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
val context = uCont.context
context.checkCompletion()
val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
if (!cont.dispatcher.isDispatchNeeded(context)) {
return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
}
cont.dispatchYield(Unit)
COROUTINE_SUSPENDED
}

internal fun CoroutineContext.checkCompletion() {
val job = get(Job)
if (job != null && !job.isActive) throw job.getCancellationException()
}

执行yield()时,会优先检测任务的完成状态,如果!job.isActive直接抛出CancellableException

suspendCoroutineUninterceptedOrReturn

主要作用为获取当前协程的实例,并且挂起当前协程或者不挂起直接返回结果

根据上述源码发现,挂起函数的关键在于suspendCoroutineUninterceptedOrReturn,只要使用了该方法,就可以成为挂起函数

通过做转换的时候,可以使用系统提供的两个转换函数:

  • suspendCoroutine
  • suspendCancellableCoroutine推荐使用

相关源码

suspendCoroutine
1
2
3
4
5
6
7
suspend fun test() = suspendCoroutine<String> { continuation ->
if (...) {
continuation.resume("11")
} else {
continuation.resumeWithException(NullPointerException("123"))
}
}
suspendCancellableCoroutine
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val aa = 0
suspend fun ttt() = suspendCancellableCoroutine<Int> { cancellableContinuation ->
if (aa == 0) {
//执行完毕抛出结果
cancellableContinuation.resume(1) {
// 执行过程异常捕获
log("aaa ${it.message}")
}
} else {
cancellableContinuation.resumeWithException(IllegalArgumentException("123"))
}

cancellableContinuation.invokeOnCancellation {
//协程任务执行cancel时,回调该方法
log("我被取消了")
}
}

可以通过continuation.invokeCancellation()执行取消操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
)
: T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
/*
* For non-atomic cancellation we setup parent-child relationship immediately
* in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
* properly supports cancellation.
*/

cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}

禁止取消

当任务被取消时,挂起函数会收到CancellationException后续如果需要执行一些其他的挂起函数任务将无法执行。

对挂起函数调用withContext(NonCancellable),保证挂起函数正常执行。

关键在于isActive永远为true

超时取消

大部分取消协程的原因都是超出了预期的执行时间,此时就会去触发取消的操作。

对挂起函数调用withTimeout(XX)withTimeoutOrNull(XX),唯一的区别就是后者会返回null而不是抛出异常。

原理实现

Dispatchers原理

无论是Dispatchers.Default或者IO都是CoroutineDispatcher的子类。

1
2
3
4
5
6
7
8
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
//线程调度,指定协程在某一线程上运行
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
//封装 Continuation 为 DispatchedContinuation
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
}

CoroutineDispatacher继承AbstractCoroutineContextElement类,还实现了ContinuationInterceptor接口。

DispatchedContinuation

代理协程体Continuation对象并持有线程调度器,负责使用线程调度器将协程体调度到执行的线程执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {


override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
// 是否需要线程调度
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC_DEFAULT
// dispatch 调度线程,第二个参数是一个Runnable类型,这里传参this也就是DispatchedContinuation自身
// DispatchedContinuation实际上也是一个Runnable对象,调用调度器的dispatch方法之后就可以使这个runnable在指定的线程运行了
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
withCoroutineContext(this.context, countOrElement) {
// 不需要调度,执行协程体的resumeWith
continuation.resumeWith(result)
}
}
}
}
// 默认启动模式
inline fun resumeCancellableWith(result: Result<T>) {
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatchedWith(result)
}
}
}
}

}

DispatchedContinuation用两个参数构建

  • dispatcher:拦截器
  • continuation:协程体类对象

其中resumeWith()resumeCancellableWith()负责协程的启动。

//TODO 先挂着

协程启动流程

  1. 通过CoroutineScope.launch()创建一个协程,默认启动模式为ControutineStart.DEFAULT,创建一个StandaloneCoroutine协程对象
  2. 执行StandaloneCoroutine.start()实质执行到AbstractCoroutine.start(),继续触发到CoroutineStart.invoke()
  3. 由于默认调度器为Dispatchers.Default,所以执行到了startCoroutineCancellable()
  4. startCoroutineCancellable()内部主要有三次调用
    • createCoroutineUnintercepted():创建一个协程体类对象
    • intercepted:将协程体类包装成DispatchedContinuation对象
    • resumeCancellableWith():通过Default调用到resumeCancellableWith()
  5. 实际调用到了DispatchContinuation.resumeCancellableWith(),最后执行到Continuation.resumeWith()执行协程任务。

协程挂起/恢复原理

挂起的特点:不阻塞线程。挂起的本质切线程,并且在相应逻辑处理完毕之后,再重新切回线程。

1
2
3
4
5
6
7
8
9
suspend fun loginUser(userId: String, password: String): String {
val user = logUserIn(userId, password)
val userDb = logUserIn(user)
return userDb
}

suspend fun logUserIn(userId: String, password: String): String

suspend fun logUserIn(userId: String): String

反编译后得到

1
2
3
4
5
6

fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
val user = logUserIn(userId, password)
val userDb = logUserIn(user)
completion.resume(userDb)
}

调用挂起函数或者suspend lambda表达式时,都会一个隐式参数传入,这个参数是Continuation类型。

CPS:续体传递风格

在每个挂起函数suspend lambda表达式都会附加一个Continuation参数,并且是用来代替suspend

Continuation接口

挂起函数通过Continuation在方法间互相通信,基本实现如下:

1
2
3
4
5
6
7
8
9
10
interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(value: Result<T>)
}

public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))

public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))

后续添加resume(value)resumeWithException(exception)可以方便的获取结果,而不需要从Result解析。

Continuation主要有以下参数和方法

  • context:内部使用的CoroutineContext
  • resumeWith():恢复协程的执行,同时传入一个Result。内部包括了计算结果过程中发生的异常

状态机

Kotlin编译器会确定函数何时可以在内部挂起,每个挂起点都会被声明为有限状态机的一个状态,每个状态用label表示

查看反编译后源码,内部源码大概如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@Nullable
public final Object loginUser(@NotNull String userId, @NotNull String password, @NotNull Continuation $completion) {
Object $continuation;
label27: {
if ($completion instanceof <undefinedtype>) {
$continuation = (<undefinedtype>)$completion;
if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
break label27;
}
}

$continuation = new ContinuationImpl($completion) {
// $FF: synthetic field
Object result;
int label;
Object L$0;
Object L$1;
Object L$2;
Object L$3;

@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return MyClass.this.loginUser((String)null, (String)null, this);
}
};
}

Object var10000;
label22: {
Object $result = ((<undefinedtype>)$continuation).result;
Object var8 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
String user;
switch(((<undefinedtype>)$continuation).label) {
case 0:
//错误检查
ResultKt.throwOnFailure($result);
((<undefinedtype>)$continuation).L$0 = this;
((<undefinedtype>)$continuation).L$1 = userId;
((<undefinedtype>)$continuation).L$2 = password;
//设置 label为1 下次执行切换到 case 1
((<undefinedtype>)$continuation).label = 1;
//当前状态机执行的流程
var10000 = this.logUserIn(userId, password, (Continuation)$continuation);
if (var10000 == var8) {
return var8;
}
break;
case 1:
password = (String)((<undefinedtype>)$continuation).L$2;
userId = (String)((<undefinedtype>)$continuation).L$1;
this = (MyClass)((<undefinedtype>)$continuation).L$0;
ResultKt.throwOnFailure($result);
user = (String)var10000;
((<undefinedtype>)$continuation).L$0 = this;
((<undefinedtype>)$continuation).L$1 = userId;
((<undefinedtype>)$continuation).L$2 = password;
((<undefinedtype>)$continuation).L$3 = user;
((<undefinedtype>)$continuation).label = 2;
var10000 = this.logUserIn(user, (Continuation)$continuation);
if (var10000 == var8) {
return var8;
}
var10000 = $result;
break;
case 2:
user = (String)((<undefinedtype>)$continuation).L$3;
password = (String)((<undefinedtype>)$continuation).L$2;
userId = (String)((<undefinedtype>)$continuation).L$1;
MyClass var9 = (MyClass)((<undefinedtype>)$continuation).L$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break label22;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}

}

String userDb = (String)var10000;
return userDb;
}

Kotlin编译器将每个挂起函数转换为一个状态机,在每次函数需要挂起时使用回调并进行优化。

观察上述源码发现主要有几个关键点

ContinuationImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)

public override val context: CoroutineContext
get() = _context!!

@Transient
private var intercepted: Continuation<Any?>? = null

public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }

}

//其中 invokeSuspend()是由BaseContinuationImpl实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
//调用 invokeSuspend 真正执行协程体
val outcome = invokeSuspend(param)
//如果返回值为 CORPUTINE_SUSPENDED ,需要执行挂起操作
if (outcome === COROUTINE_SUSPENDED) return
//协程体执行成功
Result.success(outcome)
} catch (exception: Throwable) {
//协程体执行异常
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
//此处表示 StandaloneCoroutine
completion.resumeWith(outcome)
return
}
}
}
}

protected abstract fun invokeSuspend(result: Result<Any?>): Any?
...
}

invokeSuspend()执行的就是协程体,当invokeSuspend()返回值为COROUTINE_SUSPENDED时,执行return操作,协程体的操作也会被结束,所以COROUTINE_SUSPENDED也表示协程发生挂起

协程挂起

通过挂起函数将协程挂起,此处拿withContext()进行分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
)
: T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->

val oldContext = uCont.context
val newContext = oldContext + context
//检查协程是否活跃
newContext.checkCompletion()

if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont) // MODE_DIRECT
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}

if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont) // MODE_UNDISPATCHED
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}

val coroutine = DispatchedCoroutine(newContext, uCont) // MODE_CANCELLABLE
coroutine.initParentJob()
//coroutine 为 DispatchedCoroutine,持有需要恢复的协程
block.startCoroutineCancellable(coroutine, coroutine)
//返回结果为 挂起 还是完成
coroutine.getResult()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//DispatchedCoroutine.kt
fun getResult(): Any? {
//需要挂起,则返回COROUTINE_SUSPENDED
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state.unboxState()
if (state is CompletedExceptionally) throw state.cause
@Suppress("UNCHECKED_CAST")
return state as T
}

private fun trySuspend(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
RESUMED -> return false
else -> error("Already suspended")
}
}
}

协程是否挂起,关键在于是否返回COROUTINE_SUSPENDED,在getResult()中就是判断trySuspend()是否返回true

协程挂起

协程恢复

withContext()中调用startCoroutine()传入了两个参数,其中第二个表示协程完成的回调

当协程完成的时候会调用resumeWith(),然后层层传递到JobSupport.afterCompletion(),最后执行到DispatchedCoroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
override fun afterCompletionInternal(state: Any?, mode: Int) {
if (tryResume()) return // completed before getResult invocation -- bail out
// otherwise, getResult has already commenced, i.e. completed later or in other thread
super.afterCompletionInternal(state, mode)
}

private fun tryResume(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
SUSPENDED -> return false
else -> error("Already resumed")
}
}
}

afterCompletionInternal()判断协程是否被挂起,若挂起则恢复已被挂起的协程。

然后再回到执行线程上,就会继续执行invokeSuspend()直到执行结束。

协程恢复

协程并发

参考链接

Kotlin/Keep

Android_开发者

Kotlin协程原理解析

图解协程:suspend


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!