协程的基础设施
约 2463 字大约 8 分钟
2025-06-15
一个 Kotlin 协程
简单协程的创建
fun main() {
val suspendFunction0: suspend () -> Int = suspend {
println("In Coroutine.")
666 // 协程体的返回值
}
val createCoroutine: Continuation<Unit> = suspendFunction0.createCoroutine(object : Continuation<Int> {
override val context: CoroutineContext = EmptyCoroutineContext
override fun resumeWith(result: Result<Int>) {
println("Coroutine End: $result")
}
})
// 启动这个协程
createCoroutine.resume(Unit)
// 运行结果
// In Coroutine.
// Coroutine End: Success(666)
}
标准库中提供了一个createCoroutine
函数,我们可以通过它来创建协程,不过这个协程并不会立即执行
我们先来看看它的声明:
public fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
): Continuation<Unit>
- 其中
suspend () -> T
是一个被suspend修饰的挂起函数,这也是协程的执行体。 - 参数
completion
会在协程执行完成后调用,也就是协程的完成回调。 - 返回值是一个
Continuation对象
,由于现在协程仅仅被创建出来,因此需要通过这个值在之后触发协程的启动。
协程的启动
调用continuation.resume(Unit)
之后,协程体会立即执行
一般来讲,我们创建协程后就会立即让它开始执行,因此标准库也提供了一个一步到位的API——startCoroutine
。它与createCoroutine
除了返回值类型不同之外,剩下的完全一致。
fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T>)
协程体的Receiver
与协程的创建和启动相关的API一共有两组,除了前两节探讨的一组以外,还有一组:
fun <R, T> (suspend R.() -> T).createCoroutine(
receiver: R,
completion: Continuation<T>
): Continuation<Unit>
fun <R, T> (suspend R.() -> T).startCoroutine(
receiver: R,
completion: Continuation<T>
)
仔细观察可以发现,就是多了一个receiver参数,这个receiver参数可以为协程体提供一个作用域,在协程体内我们可以直接使用作用域内提供的函数或者状态等。
为了方便使用带有Receiver的协程API,我们可以封装一个用以启动协程的函数launchCoroutine
fun <R, T> launchCoroutine(receiver: R, block: suspend R.() -> T) {
block.startCoroutine(receiver, object : Continuation<T> {
override fun resumeWith(result: Result<T>) {
println("Coroutine End: $result")
}
override val context = EmptyCoroutineContext
})
}
使用时先创建一个作用域,可以定义一个ProducerScope类来创建一个协程的作用域
// @RestrictsSuspension
class ProducerScope<T> {
suspend fun produce(value: T) {}
}
fun main() {
launchCoroutine(ProducerScope<Int>()) {
// 如果我们为Receiver对应的类型增加一个RestrictsSuspension注解,那么在它的作用下,协程体内就无法调用外部的挂起函数了
// delay(1000L)
println(produce(0))
}
}
作用域可以用来提供函数支持,自然也就可以用来增加限制。如果我们为Receiver对应的类型增加一个RestrictsSuspension
注解,那么在它的作用下,协程体内就无法调用外部的挂起函数了,也就是说如果调用delay函数就会出错。
函数的挂起
我们已经知道使用suspend关键字修饰的函数叫作挂起函数,挂起函数只能在协程体内或其他挂起函数内调用。这样一来,整个Kotlin语言体系内的函数就分为两派:普通函数
和挂起函数
。其中挂起函数
可以调用任何函数
,普通函数
只能调用普通函数
。
但是,需要注意的是,挂起函数
不一定真的会挂起
,只是提供了挂起的条件。那什么时候才会挂起呢?
挂起函数
是否会挂起
,取决于resume函数与对应的挂起函数的调用是否在相同的调用栈帧
上,切换函数调用栈帧的方法可以是切换到其他线程上执行,也可以是不切换线程但在当前函数返回之后的某一个时刻再执行。通俗来说只有在挂起函数返回之后调用Continuation.resume()
才挂起。
suspend fun notSuspend() = suspendCoroutine<Int> { continuation ->
println(100)
continuation.resume(100)
}
// suspendCoroutine 是 Kotlin 中用于挂起协程并与外部世界进行交互的函数之一。它的主要作用是允许你在协程中将异步非挂起的操作转化为挂起操作
suspend fun needSuspend() = suspendCoroutine<Int> { continuation ->
thread {
println(200)
// resumeWith 更通用,能处理成功和失败两种情况;resume 只能处理成功。
continuation.resume(200)
// continuation.resumeWith(Result.success(200))
}
}
// 反编译Java代码
public final class SuspendDemoKt {
@Nullable
public static final Object notSuspend(@NotNull Continuation $completion) {
SafeContinuation var2 = new SafeContinuation(IntrinsicsKt.intercepted($completion));
Continuation continuation = (Continuation)var2;
int var4 = 0;
byte var5 = 100;
System.out.println(var5);
Result.Companion var10001 = Result.Companion;
continuation.resumeWith(Result.constructor-impl(Boxing.boxInt(100)));
Object var10000 = var2.getOrThrow();
if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
DebugProbesKt.probeCoroutineSuspended($completion);
}
return var10000;
}
@Nullable
public static final Object needSuspend(@NotNull Continuation $completion) {
SafeContinuation var2 = new SafeContinuation(IntrinsicsKt.intercepted($completion));
final Continuation continuation = (Continuation)var2;
int var4 = 0;
ThreadsKt.thread$default(false, false, (ClassLoader)null, (String)null, 0, new Function0() {
public final void invoke() {
short var1 = 200;
System.out.println(var1);
Result.Companion var10001 = Result.Companion;
continuation.resumeWith(Result.constructor-impl(200));
}
// $FF: synthetic method
// $FF: bridge method
public Object invoke() {
this.invoke();
return Unit.INSTANCE;
}
}, 31, (Object)null);
Object var10000 = var2.getOrThrow();
if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
DebugProbesKt.probeCoroutineSuspended($completion);
}
return var10000;
}
}
挂起函数如果需要挂起则需要通过suspendCoroutine
来获取Continuation
实例。我们已经知道它是协程体,但是这个实例是怎么传进来的呢?
- 在反编译Java代码中清晰地看到,
suspend()->Int
类型的函数在Java语言看来实际上是(Continuation<*>)->Object
类型的。这正好与我们经常写的异步回调的方法类似,传一个回调进去等待结果返回就好了。
那么它的直接返回值是个啥?这要分两种情况讨论:
- 挂起函数同步返回。作为参数传入的Continuation的resumeWith不会被调用,函数的实际返回值就是它作为挂起函数的返回值。notSuspend尽管看起来似乎调用了
resumeWith
,不过调用对象是SafeContinuation
,因此它的实现属于同步返回。 - 挂起函数挂起,执行异步逻辑。此时函数的实际返回值是一个挂起标志,通过这个标志外部协程就可以知道该函数需要挂起等到异步逻辑执行。在Kotlin中这个标志是个常量,定义在Intrinsics.kt当中。
现在大家知道了原来挂起函数就是普通函数的参数中多了一个Continuation实例,难怪挂起函数总是可以调用普通函数,普通函数却不可以调用挂起函数。
协程的上下文
协程上下文本质是一个集合类
,储存了协程有关的元数据,被保存在协程实例 Continuation 中。用法上类似于 Map,每一个数据都有一个 Key。不同点在于 Key 定义在数据内部,也就是说每一条数据在加入上下文集合之前就已经确定了自己的 Key。
协程异常处理器的实现
class CoroutineExceptionHandler(val onErrorAction: (Throwable) -> Unit) : AbstractCoroutineContextElement(Key) {
companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>
fun onError(error: Throwable) {
onErrorAction(error)
}
}
不难看出,方便起见,Key 被设计成一个泛型接口,它的实参一般就是 item 的类型。如此一来就不必维护一堆常量用来索引了。
协程上下文的使用
fun main() {
var cc: CoroutineContext = EmptyCoroutineContext
cc += CoroutineExceptionHandler {
println("error: $it")
}
suspend {
// 在协程内部可以通过coroutineContext这个全局属性直接获取当前协程的上下文,它也是标准库中的API
println("In Coroutine [${coroutineContext[CoroutineExceptionHandler]}].")
throw RuntimeException("custom exception")
}.startCoroutine(object : Continuation<Int> {
override val context: CoroutineContext = cc // 也可以直接使用CoroutineExceptionHandler的实例
override fun resumeWith(result: Result<Int>) {
result.onFailure {
// 既然上下文是个集合,那么使用自然也和集合类似。
context[CoroutineExceptionHandler]?.onError(it)
}
println("resumeWith")
}
})
}
协程的拦截器
在Continuation和协程上下文的基础上,标准库又提供了一个叫作拦截器Interceptor
的组件,它允许我们拦截协程异步回调时的恢复调用。既然可以拦截恢复调用,那么想要操纵协程的线程调度应该不是什么难事。
打印日志的拦截器
class LogInterceptor : ContinuationInterceptor {
override val key = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>) = LogContinuation(continuation)
}
class LogContinuation<T>(private val continuation: Continuation<T>) : Continuation<T> by continuation {
override fun resumeWith(result: Result<T>) {
println("before resumeWith: $result")
continuation.resumeWith(result)
println("after resumeWith.")
}
}
拦截器也是协程上下文的一个类型(实现了CoroutineContext.Element
接口),它的核心方法interceptContinuation
接收要恢复的协程,把它包装成一个新的实例返回,在这个新协程中可以执行特定的操作发挥拦截器的价值。
注意在新的协程中要调用传入协程的resume
相关方法,否则就成了真正意义上的拦截器了(协程的恢复被拦截,原协程无法继续运行)。
既然拦截器也是一个上下文,那么使用方法自然也和上下文一样
suspend {
needSuspend()
0
}.startCoroutine(object : Continuation<Int> {
override val context: CoroutineContext = LogInterceptor()
override fun resumeWith(result: Result<Int>) {
println("resumeWith")
}
})
// 运行结果
// before resumeWith: Success(kotlin.Unit)
// after resumeWith.
// 200
// before resumeWith: Success(200)
// resumeWith
// after resumeWith.
协程体中调用了needSuspend
挂起函数,因此除了首次启动时触发一次拦截,之后从挂起函数恢复调用处又分别执行了一次拦截。
拦截器的执行细节
那么拦截器包装函数是怎样执行的?它在创建协程的时候就执行了!也就是说协程创建出来,SafeContinuation 里面包裹的不再是协程体,而是经过拦截器包装的 Continuation。
public fun <R, T> (suspend R.() -> T).createCoroutine(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
通过查看协程的代码,发现它调了一下 Continuation.intercepted()
,也就是最终调用的是ContinuationInterceptor
的实现。 和我们自己写的手动使用协程上下文几乎一样,通过 Key 从 context 中读取拦截器,返回包装后的结果。若没有拦截器就返回自身。