/** * [CoroutineScope] tied to this [ViewModel]. * This scope will be canceled when ViewModel will be cleared, i.e [ViewModel.onCleared] is called * * This scope is bound to * [Dispatchers.Main.immediate][kotlinx.coroutines.MainCoroutineDispatcher.immediate] */ //使用拓展方法 val ViewModel.viewModelScope: CoroutineScope get() { val scope: CoroutineScope? = this.getTag(JOB_KEY) //缓存中读取 对应scope if (scope != null) { return scope } //对应了ViewModel内部的实现代码 return setTagIfAbsent(JOB_KEY, CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate)) } //自动取消 coroutineScope internalclassCloseableCoroutineScope(context: CoroutineContext) : Closeable, CoroutineScope { overrideval coroutineContext: CoroutineContext = context
privatefinal Map<String, Object> mBagOfTags = new HashMap<>(); //同步存值 <T> T setTagIfAbsent(String key, T newValue) { T previous; synchronized (mBagOfTags) { previous = (T) mBagOfTags.get(key); if (previous == null) { mBagOfTags.put(key, newValue); } } T result = previous == null ? newValue : previous; if (mCleared) { closeWithRuntimeException(result); } return result; }
@MainThread //销毁ViewModel final void clear() { mCleared = true; // Since clear() is final, this method is still called on mock objects // and in those cases, mBagOfTags is null. It'll always be empty though // because setTagIfAbsent and getTag are not final so we can skip // clearing it if (mBagOfTags != null) { synchronized (mBagOfTags) { //读取缓存好的 viewModelScope对象 for (Object value : mBagOfTags.values()) { // see comment for the similar call in setTagIfAbsent closeWithRuntimeException(value); } } } onCleared(); }
... fundispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) { trackTask() // this is needed for virtual time support val task = createTask(block, taskContext) // try to submit the task to the local queue and act depending on the result val currentWorker = currentWorker() //任务优先插入本地队列中执行 val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch) //本地队列已满,就将任务插入到全局队列中 if (notAdded != null) { if (!addToGlobalQueue(notAdded)) { // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted throw RejectedExecutionException("$schedulerName was terminated") } } ... } //执行的任务 internalinnerclassWorkerprivateconstructor() : Thread() { overridefunrun() = runWorker() //执行任务 privatefunrunWorker() { var rescanned = false while (!isTerminated && state != WorkerState.TERMINATED) { //从队列中找到任务 val task = findTask(mayHaveLocalTasks) ... } } //找到本地队列任务 funfindTask(scanLocalQueue: Boolean): Task? { //获取CPU控制权 if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue) // If we can't acquire a CPU permit -- attempt to find blocking task val task = if (scanLocalQueue) { localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull() } else { globalBlockingQueue.removeFirstOrNull() } //从其他队列获取任务执行 return task ?: trySteal(blockingOnly = true) }
//从其他队列获取任务 privatefuntrySteal(blockingOnly: Boolean): Task? { assert { localQueue.size == 0 } val created = createdWorkers // 0 to await an initialization and 1 to avoid excess stealing on single-core machines if (created < 2) { returnnull }
var currentIndex = nextInt(created) var minDelay = Long.MAX_VALUE repeat(created) { ++currentIndex if (currentIndex > created) currentIndex = 1 val worker = workers[currentIndex] if (worker !== null && worker !== this) { assert { localQueue.size == 0 } //从其他工作线程中获取任务来执行 val stealResult = if (blockingOnly) { localQueue.tryStealBlockingFrom(victim = worker.localQueue) } else { localQueue.tryStealFrom(victim = worker.localQueue) } if (stealResult == TASK_STOLEN) { return localQueue.poll() } elseif (stealResult > 0) { minDelay = min(minDelay, stealResult) } } } minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else0 returnnull }
//加载各个平台下定义的`MainDispatcherFactory` privatefunloadMainDispatcher(): MainCoroutineDispatcher { returntry { val factories = if (FAST_SERVICE_LOADER_ENABLED) { FastServiceLoader.loadMainDispatcherFactory() } else { // We are explicitly using the // `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()` // form of the ServiceLoader call to enable R8 optimization when compiled on Android. ServiceLoader.load( MainDispatcherFactory::class.java, MainDispatcherFactory::class.java.classLoader ).iterator().asSequence().toList() } @Suppress("ConstantConditionIf") factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories) ?: createMissingDispatcher() } catch (e: Throwable) { // Service loader can throw an exception as well createMissingDispatcher(e) } }
拿Android举例,分析下如何实现Dispatchers.Main功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
internalfunloadMainDispatcherFactory(): List<MainDispatcherFactory> { val clz = MainDispatcherFactory::class.java if (!ANDROID_DETECTED) { return load(clz, clz.classLoader) }
returntry { val result = ArrayList<MainDispatcherFactory>(2) //加载对应类名的类 createInstanceOf(clz, "kotlinx.coroutines.android.AndroidDispatcherFactory")?.apply { result.add(this) } createInstanceOf(clz, "kotlinx.coroutines.test.internal.TestMainDispatcherFactory")?.apply { result.add(this) } result } catch (e: Throwable) { // Fallback to the regular SL in case of any unexpected exception load(clz, clz.classLoader) } }
publicfunhandleCoroutineException(context: CoroutineContext, exception: Throwable) { // Invoke an exception handler from the context if present //若context存在 ExceptionHandler 则会对异常进行处理 try { context[CoroutineExceptionHandler]?.let { it.handleException(context, exception) return } } catch (t: Throwable) { handleCoroutineExceptionImpl(context, handlerException(exception, t)) return } // If a handler is not present in the context or an exception was thrown, fallback to the global handler handleCoroutineExceptionImpl(context, exception) }
`async`执行指向如下代码 publicfun<T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspendCoroutineScope.() -> T ): Deferred<T> { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyDeferredCoroutine(newContext, block) else DeferredCoroutine<T>(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }
privateopenclassDeferredCoroutine<T>( parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> { //没有对 exception进行处理,导致全局异常无法处理 overridefungetCompleted(): T = getCompletedInternal() as T overridesuspendfunawait(): T = awaitInternal() as T overrideval onAwait: SelectClause1<T> get() = this overridefun<R>registerSelectClause1(select: SelectInstance<R>, block: suspend (T) -> R) = registerSelectClause1Internal(select, block) }
privatefunnotifyCancelling(list: NodeList, cause: Throwable) { // first cancel our own children onCancelling(cause) notifyHandlers<JobCancellingNode<*>>(list, cause) // then cancel parent cancelParent(cause) // tentative cancellation -- does not matter if there is no parent }
privatefuncancelParent(cause: Throwable): Boolean { // Is scoped coroutine -- don't propagate, will be rethrown if (isScopedCoroutine) returntrue
val isCancellation = cause is CancellationException val parent = parentHandle // No parent -- ignore CE, report other exceptions. if (parent === null || parent === NonDisposableHandle) { return isCancellation } //调用所有子协程进行取消任务,子线程也会收到 CancellException return parent.childCancelled(cause) || isCancellation }
//EventLoopImplBase publicoverridefunscheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { val timeNanos = delayToNanos(timeMillis)//设置执行时间 if (timeNanos < MAX_DELAY_NS) { val now = nanoTime() DelayedResumeTask(now + timeNanos, continuation).also { task -> //按照延时添加任务 continuation.disposeOnCancellation(task) schedule(now, task) } } }
关键点就在于suspendCancellableCoroutine提供了普通函数的挂起转换
1 2 3 4 5 6 7 8 9 10 11 12 13 14
publicsuspendinlinefun<T>suspendCancellableCoroutine( crossinline block: (CancellableContinuation<T>) -> Unit ): T = suspendCoroutineUninterceptedOrReturn { uCont -> val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE) /* * For non-atomic cancellation we setup parent-child relationship immediately * in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but * properly supports cancellation. */ cancellable.initCancellability() block(cancellable) cancellable.getResult() }
suspend fun test9() { val startTime = System.currentTimeMillis() val dispatcher = Executors.newFixedThreadPool(3).asCoroutineDispatcher() val job = coroutineScope.launch(dispatcher) { var nextPrintTime = startTime var i = 0 while (isActive) { // 一个浪费 CPU 的计算任务循环 // 每秒打印信息 2 次 if (System.currentTimeMillis() >= nextPrintTime) { i++ log(i) nextPrintTime += 500L } } } delay(1300L) // 等待一段时间 println("main: I'm tired of waiting!") job.cancelAndJoin() // 取消 job, 并等待它结束 println("main: Now I can quit.") dispatcher.close() }
输出结果: 19:30:01:805 [pool-1-thread-1] 1 19:30:02:292 [pool-1-thread-1] 2 19:30:02:792 [pool-1-thread-1] 3 main: I'm tired of waiting! main: Now I can quit.
privatefuntryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean { assert { state !is Finishing } // only for non-finishing states assert { state.isActive } // only for active states // get state's list or else promote to list to correctly operate on child lists val list = getOrPromoteCancellingList(state) ?: returnfalse // Create cancelling state (with rootCause!) val cancelling = Finishing(list, false, rootCause) if (!_state.compareAndSet(state, cancelling)) returnfalse // Notify listeners notifyCancelling(list, rootCause) returntrue }
@SinceKotlin("1.3") publicinterfaceContinuation<in T> { /** * The context of the coroutine that corresponds to this continuation. * 定义的协程上下文 */ publicval context: CoroutineContext
/** * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the * return value of the last suspension point. * 执行完毕回调,回调成功或失败的结果 */ publicfunresumeWith(result: Result<T>) } //提供拓展函数,支持直接调用成功或失败回调 publicinlinefun<T> Continuation<T>.resume(value: T): Unit = resumeWith(Result.success(value))
publicinlinefun<T> Continuation<T>.resumeWithException(exception: Throwable): Unit = resumeWith(Result.failure(exception))