val job = launch(start = CoroutineStart.LAZY) { println("Active")}println("New")job.join()println("Completed") /**打印结果**/NewActiveCompleted /*********** 1. 以 lazy 方式创建出来的协程 state 为 New* 2. 对应的 job 调用 join 函数后,协程进入 Active 状态,并开始执行协程对应的具体代码* 3. 当协程执行完毕后,由于没有需要等待的子协程,协程直接进入 Completed 状态*/
关于Job,常用的方法有:
//活跃的,是否仍在执行public val isActive: Boolean //启动协程,如果启动了协程,则为true;如果协程已经启动或完成,则为falsepublic fun start(): Boolean //取消Job,可通过传入Exception说明具体原因public fun cancel(cause: CancellationException? = null) //挂起协程直到此Job完成public suspend fun join() //取消任务并等待任务完成,结合了[cancel]和[join]的调用public suspend fun Job.cancelAndJoin() //给Job设置一个完成通知,当Job执行完成的时候会同步执行这个函数public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
public interface CoroutineScope { public val coroutineContext: CoroutineContext}
简而言之,协程上下文是协程必备组成部分,管理了协程的线程绑定、生命周期、异常处理和调试。
4.1.1 协程上下文结构
看一下CoroutineContext的接口方法:
public interface CoroutineContext { //操作符[]重载,可以通过CoroutineContext[Key]这种形式来获取与Key关联的Element public operator fun <E : Element> get(key: Key<E>): E? //提供遍历CoroutineContext中每一个Element的能力,并对每一个Element做operation操作 public fun <R> fold(initial: R, operation: (R, Element) -> R): R //操作符+重载,可以CoroutineContext + CoroutineContext这种形式把两个CoroutineContext合并成一个 public operator fun plus(context: CoroutineContext): CoroutineContext = ....... //返回一个新的CoroutineContext,这个CoroutineContext删除了Key对应的Element public fun minusKey(key: Key<*>): CoroutineContext //Key定义,空实现,仅仅做一个标识 public interface Key<E : Element> ///Element定义,每个Element都是一个CoroutineContext public interface Element : CoroutineContext { //每个Element都有一个Key实例 public val key: Key<*> ...... }}
Element:协程上下文的一个元素,本身就是一个单例上下文,里面有一个key,是这个元素的索引。
可知,Element本身也实现了CoroutineContext接口。
这里我们再看一下官方解释:
/** * Persistent context for the coroutine. It is an indexed set of [Element] instances. * An indexed set is a mix between a set and a map. * Every element in this set has a unique [Key]. */
在一个类似 map 的结构中,每个键必须是唯一的,因为对相同的键 put 两次值,新值会代替旧值。通过上述方式,通过键的唯一性保证了上下文中的所有子上下文实例都是唯一的。
我们按照这个格式仿写一下然后反编译。
class MyElement :AbstractCoroutineContextElement(MyElement) { companion object Key : CoroutineContext.Key<MyElement>} //反编译的java文件public final class MyElement extends AbstractCoroutineContextElement { @NotNull public static final MyElement.Key Key = new MyElement.Key((DefaultConstructorMarker)null); public MyElement() { super((kotlin.coroutines.CoroutineContext.Key)Key); } public static final class Key implements kotlin.coroutines.CoroutineContext.Key { private Key() { } // $FF: synthetic method public Key(DefaultConstructorMarker $constructor_marker) { this(); } }}
@SinceKotlin("1.3")internal class CombinedContext( //左上下文 private val left: CoroutineContext, //右元素 private val element: Element) : CoroutineContext, Serializable { override fun <E : Element> get(key: Key<E>): E? { var cur = this while (true) { //如果输入 key 和右元素的 key 相同,则返回右元素 cur.element[key]?.let { return it } // 若右元素不匹配,则向左继续查找 val next = cur.left if (next is CombinedContext) { cur = next } else { // 若左上下文不是混合上下文,则结束递归 return next[key] } } } ......} public interface Element : CoroutineContext { public val key: Key<*> public override operator fun <E : Element> get(key: Key<E>): E? = @Suppress("UNCHECKED_CAST") // 如果给定键和元素本身键相同,则返回当前元素,否则返回空 if (this.key == key) this as E else null ......} public object EmptyCoroutineContext : CoroutineContext, Serializable { //返回空 public override fun <E : Element> get(key: Key<E>): E? = null}
internal class CombinedContext( //左上下文 private val left: CoroutineContext, //右元素 private val element: Element) : CoroutineContext, Serializable { public override fun minusKey(key: Key<*>): CoroutineContext { //如果element就是要删除的元素,返回left,否则说明要删除的元素在left中,继续从left中删除对应的元素 element[key]?.let { return left } //在左上下文中去掉对应元素 val newLeft = left.minusKey(key) return when { //如果left中不存在要删除的元素,那么当前CombinedContext就不存在要删除的元素,直接返回当前CombinedContext实例 newLeft === left -> this //如果left中存在要删除的元素,删除了这个元素后,left变为了空,那么直接返回当前CombinedContext的element就行 newLeft === EmptyCoroutineContext -> element //如果left中存在要删除的元素,删除了这个元素后,left不为空,那么组合一个新的CombinedContext返回 else -> CombinedContext(newLeft, element) } } ......} public object EmptyCoroutineContext : CoroutineContext, Serializable { public override fun minusKey(key: Key<*>): CoroutineContext = this ......} public interface Element : CoroutineContext { //如果key和自己的key匹配,那么自己就是要删除的Element,返回EmptyCoroutineContext(表示删除了自己),否则说明自己不需要被删除,返回自己 public override fun minusKey(key: Key<*>): CoroutineContext = if (this.key == key) EmptyCoroutineContext else this ......}
public operator fun plus(context: CoroutineContext): CoroutineContext =//如果要相加的CoroutineContext为空,那么不做任何处理,直接返回if (context === EmptyCoroutineContext) this else//如果要相加的CoroutineContext不为空,那么对它进行fold操作,可以把acc理解成+号左边的CoroutineContext,element理解成+号右边的CoroutineContext的某一个elementcontext.fold(this) { acc, element -> //首先从左边CoroutineContext中删除右边的这个element val removed = acc.minusKey(element.key) //如果removed为空,说明左边CoroutineContext删除了和element相同的元素后为空,那么返回右边的element即可 if (removed === EmptyCoroutineContext) element else { //如果removed不为空,说明左边CoroutineContext删除了和element相同的元素后还有其他元素,那么构造一个新的CombinedContext返回 val interceptor = removed[ContinuationInterceptor] if (interceptor == null) CombinedContext(removed, element) else { val left = removed.minusKey(ContinuationInterceptor) if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else CombinedContext(CombinedContext(left, element), interceptor) } } }
public abstract class AbstractCoroutine<in T>( parentContext: CoroutineContext, initParentJob: Boolean, active: Boolean) : JobSupport(active), Job, Continuation<T>, CoroutineScope { /** * The context of this coroutine that includes this coroutine as a [Job]. */ public final override val context: CoroutineContext = parentContext + this //重写了父类的coroutineContext属性 public override val coroutineContext: CoroutineContext get() = context}
public operator fun plus(context: CoroutineContext): CoroutineContext =//如果要相加的CoroutineContext为空,那么不做任何处理,直接返回if (context === EmptyCoroutineContext) this else//如果要相加的CoroutineContext不为空,那么对它进行fold操作,可以把acc理解成+号左边的CoroutineContext,element理解成+号右边的CoroutineContext的某一个elementcontext.fold(this) { acc, element -> //首先从左边CoroutineContext中删除右边的这个element val removed = acc.minusKey(element.key) //如果removed为空,说明左边CoroutineContext删除了和element相同的元素后为空,那么返回右边的element即可 if (removed === EmptyCoroutineContext) element else { //如果removed不为空,说明左边CoroutineContext删除了和element相同的元素后还有其他元素,那么构造一个新的CombinedContext返回 val interceptor = removed[ContinuationInterceptor] if (interceptor == null) CombinedContext(removed, element) else { val left = removed.minusKey(ContinuationInterceptor) if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else CombinedContext(CombinedContext(left, element), interceptor) } } }
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) { createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))}
val interceptor = object : ContinuationInterceptor { override val key: CoroutineContext.Key<*> = ContinuationInterceptor override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> { println("intercept todo something. change run to thread") return object : Continuation<T> by continuation { override fun resumeWith(result: Result<T>) { println("create new thread") thread { continuation.resumeWith(result) } } } }} println(Thread.currentThread().name) lifecycleScope.launch(interceptor) { println("launch start. current thread: ${Thread.currentThread().name}") withContext(Dispatchers.Main) { println("new continuation todo something in the main thread. current thread: ${Thread.currentThread().name}") } launch { println("new continuation todo something. current thread: ${Thread.currentThread().name}") } println("launch end. current thread: ${Thread.currentThread().name}")}
/******打印结果******/main// 第一次launchintercept todo something. change run to threadcreate new threadlaunch start. current thread: Thread-2new continuation todo something in the main thread. current thread: maincreate new thread// 第二次launchintercept todo something. change run to threadcreate new threadlaunch end. current thread: Thread-7new continuation todo something. current thread: Thread-8
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { //将可运行块的执行分派到给定上下文中的另一个线程上 public abstract fun dispatch(context: CoroutineContext, block: Runnable) //返回一个continuation,它封装了提供的[continuation],拦截了所有的恢复 public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> //...... }
public abstract class MainCoroutineDispatcher : CoroutineDispatcher() public abstract class CoroutineDispatcher :AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true public abstract fun dispatch(context: CoroutineContext, block: Runnable) public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block) public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation) ......}
public suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T): T { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } return suspendCoroutineUninterceptedOrReturn sc@ { uCont -> // 创建新的CoroutineContext val oldContext = uCont.context val newContext = oldContext + context ...... //使用新的Dispatcher,覆盖外层 val coroutine = DispatchedCoroutine(newContext, uCont) block.startCoroutineCancellable(coroutine, coroutine) coroutine.getResult() }}
internal class DispatchedCoroutine<in T>( context: CoroutineContext, uCont: Continuation<T>) : ScopeCoroutine<T>(context, uCont) { //在complete时会会回调 override fun afterCompletion(state: Any?) { // Call afterResume from afterCompletion and not vice-versa, because stack-size is more // important for afterResume implementation afterResume(state) } override fun afterResume(state: Any?) { ////uCont就是父协程,context仍是老版context,因此可以切换回原来的线程上 if (tryResume()) return // completed before getResult invocation -- bail out // Resume in a cancellable way because we have to switch back to the original dispatcher uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont)) } ......}
public interface Continuation<in T> { //对应这个Continuation的协程上下文 public val context: CoroutineContext //恢复相应协程的执行,传递一个成功或失败的结果作为最后一个挂起点的返回值。 public fun resumeWith(result: Result<T>)} //将[value]作为最后一个挂起点的返回值,恢复相应协程的执行。@SinceKotlin("1.3")@InlineOnlypublic inline fun <T> Continuation<T>.resume(value: T): Unit = resumeWith(Result.success(value)) //恢复相应协程的执行,以便在最后一个挂起点之后重新抛出[异常]。@SinceKotlin("1.3")@InlineOnlypublic inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit = resumeWith(Result.failure(exception))
//CoroutineStart的invoke方法出现了Continuationpublic operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =when (this) { DEFAULT -> block.startCoroutineCancellable(receiver, completion) ATOMIC -> block.startCoroutine(receiver, completion) UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion) LAZY -> Unit // will start lazily}@InternalCoroutinesApipublic fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) { createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))}
最终回调到Continuation的resumeWith()恢复函数中。
public fun <T> Continuation<T>.resumeCancellableWith( result: Result<T>, onCancellation: ((cause: Throwable) -> Unit)? = null): Unit = when (this) { is DispatchedContinuation -> resumeCancellableWith(result, onCancellation) else -> resumeWith(result)}
我们再深入kotlin源码看一下其内部实现。
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted( receiver: R, completion: Continuation<T>): Continuation<Unit> { val probeCompletion = probeCoroutineCreated(completion) return if (this is BaseContinuationImpl) create(receiver, probeCompletion) else { createCoroutineFromSuspendFunction(probeCompletion) { (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)//1 } }}
private inline fun <T> createCoroutineFromSuspendFunction( completion: Continuation<T>, crossinline block: (Continuation<T>) -> Any?): Continuation<Unit> { val context = completion.context // label == 0 when coroutine is not started yet (initially) or label == 1 when it was return if (context === EmptyCoroutineContext) object : RestrictedContinuationImpl(completion as Continuation<Any?>) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith block(this) // run the block, may return or suspend } 1 -> { label = 2 result.getOrThrow() // this is the result if the block had suspended } else -> error("This coroutine had already completed") } } else object : ContinuationImpl(completion as Continuation<Any?>, context) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith block(this) // run the block, may return or suspend } 1 -> { label = 2 result.getOrThrow() // this is the result if the block had suspended } else -> error("This coroutine had already completed") } }}
internal abstract class BaseContinuationImpl(...) { // 实现 Continuation 的 resumeWith,并且是 final 的,不可被重写 public final override fun resumeWith(result: Result<Any?>) { ... val outcome = invokeSuspend(param) ... } // 由编译生成的协程相关类来实现 protected abstract fun invokeSuspend(result: Result<Any?>): Any?}
前述的协程示例代码反编译为:
public static final Object test(@NotNull Continuation $completion) { Job var10000 = BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope((CoroutineContext)Dispatchers.getIO()), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) { int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { //挂起标识 Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch(this.label) { case 0: ResultKt.throwOnFailure($result); //设置挂起后恢复,进入的状态 this.label = 1; if (DelayKt.delay(11L, this) == var2) { return var2; } break; case 1: // 是否需要抛出异常 ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } return Unit.INSTANCE; } @NotNull public final Continuation create(@Nullable Object value, @NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion"); Function2 var3 = new <anonymous constructor>(completion); return var3; } public final Object invoke(Object var1, Object var2) { return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); } }), 3, (Object)null); return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;}
internal abstract class BaseContinuationImpl( public val completion: Continuation<Any?>?) : Continuation<Any?>, CoroutineStackFrame, Serializable { // This implementation is final. This fact is used to unroll resumeWith recursion. public final override fun resumeWith(result: Result<Any?>) { // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var current = this var param = result while (true) { probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast when trying to resume continuation without completion val outcome: Result<Any?> = try { //执行invokeSuspend内的代码块 val outcome = invokeSuspend(param) //如果代码块内执行了挂起方法,协程挂起,resumeWith执行结束,再次调用resumeWith时协程挂起点之后的代码才能继续执行 if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // 如果完成的completion也是BaseContinuationImpl,就会进入循环 current = completion param = outcome } else { // 执行completion resumeWith方法 completion.resumeWith(outcome) return } } } } protected abstract fun invokeSuspend(result: Result<Any?>): Any? .....}
下面看一下invokeSuspend的实现逻辑。
fun main(args: Array<String>) { val coroutineDispatcher = newSingleThreadContext("ctx") // 启动协程 1 GlobalScope.launch(coroutineDispatcher) { println("the first coroutine") async (Dispatchers.IO) { println("the second coroutine 11111") delay(100) println("the second coroutine 222222") }.await() println("the first coroutine end end end") } // 保证 main 线程存活,确保上面两个协程运行完成 Thread.sleep(500)}
前述示例编译成SuspendLambda子类的invokeSuspend方法为:
public final Object invokeSuspend(@NotNull Object $result) { //挂起函数返回标识SUSPEND_FLAG Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); String var3; boolean var4; //label默认初始值为0 switch(this.label) { case 0: ResultKt.throwOnFailure($result); CoroutineScope $this$launch = (CoroutineScope)this.L$0; var3 = "the first coroutine"; var4 = false; System.out.println(var3); //新建并启动 async 协程 Deferred var10000 = BuildersKt.async$default($this$launch, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) { int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { //挂起标识 Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); String var2; boolean var3; switch(this.label) { case 0: ResultKt.throwOnFailure($result); var2 = "the second coroutine 11111"; var3 = false; System.out.println(var2); this.label = 1; //判断是否执行delay挂起函数 if (DelayKt.delay(100L, this) == var4) { //挂起,跳出该方法 return var4; } break; case 1: ResultKt.throwOnFailure($result); // 恢复协程后再执行一次 resumeWith(),然后无异常的话执行最后的 println() break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } var2 = "the second coroutine 222222"; var3 = false; System.out.println(var2); return Unit.INSTANCE; } @NotNull public final Continuation create(@Nullable Object value, @NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion"); Function2 var3 = new <anonymous constructor>(completion); return var3; } public final Object invoke(Object var1, Object var2) { return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); } }), 2, (Object)null); //设置挂起后恢复时,进入的状态 this.label = 1; //调用await()挂起函数 if (var10000.await(this) == var5) { return var5; } break; case 1: ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } var3 = "the first coroutine end end end"; var4 = false; System.out.println(var3); return Unit.INSTANCE;}
public final override fun resumeWith(result: Result<T>) { val state = makeCompletingOnce(result.toState()) if (state === COMPLETING_WAITING_CHILDREN) return afterResume(state)}
在 makeCompletingOnce 方法中,会根据 state 去处理协程状态,这里最终会走到ResumeAwaitOnCompletion.invoke 来恢复父协程,必要的话还会把 async 的结果给它。
private class ResumeAwaitOnCompletion<T>( private val continuation: CancellableContinuationImpl<T>) : JobNode() { override fun invoke(cause: Throwable?) { val state = job.state assert { state !is Incomplete } if (state is CompletedExceptionally) { // Resume with with the corresponding exception to preserve it continuation.resumeWithException(state.cause) } else { // resume 被挂起的协程 @Suppress("UNCHECKED_CAST") continuation.resume(state.unboxState() as T) } }}