Kotlin 协程在内部是如何工作的?

How do Kotlin coroutines work internally?

Kotlin 内部是如何实现协程的?

协程据说是线程的“轻量版”,我理解他们在内部使用线程来执行协程。

当我使用任何构建器函数启动协程时会发生什么?

这是我对运行这段代码的理解:

GlobalScope.launch {       <---- (A)
    val y = loadData()     <---- (B)  // suspend fun loadData() 
    println(y)             <---- (C)
    delay(1000)            <---- (D)
    println("completed")   <---- (E)
}
  1. Kotlin 在开头有一个预定义的ThreadPool
  2. (A),Kotlin 开始在下一个可用的空闲线程(比如 Thread01)中执行协程。
  3. (B),Kotlin停止执行当前线程,并在下一个可用的空闲线程(Thread02)中启动挂起函数loadData()
  4. (B)returns执行后,Kotlin在下一个可用的空闲线程(Thread03)中继续协程
  5. (C)Thread03 上执行。
  6. (D)Thread03 停止。
  7. 1000 毫秒后,(E) 在下一个空闲线程上执行,比如 Thread01

我理解正确吗?还是协程以不同的方式实现?


2021 年更新:Manuel Vivo 的 Here's an excellent article 补充了以下所有答案。

协程与您描述的任何调度策略完全不同。协程基本上是 suspend fun 的调用链。暂停完全在您的控制之下:您只需调用 suspendCoroutine。你会得到一个回调对象,这样你就可以调用它的 resume 方法并回到你暂停的地方。

从下面的一些代码中可以看出,暂停是一种非常直接和透明的机制,完全在您的控制之下:

import kotlin.coroutines.*
import kotlinx.coroutines.*

var continuation: Continuation<String>? = null

fun main(args: Array<String>) {
    val job = GlobalScope.launch(Dispatchers.Unconfined) {
        while (true) {
            println(suspendHere())
        }
    }
    continuation!!.resume("Resumed first time")
    continuation!!.resume("Resumed second time")
}

suspend fun suspendHere() = suspendCancellableCoroutine<String> {
    continuation = it
}

上面的所有代码都在同一个主线程上执行。根本没有多线程。

launch 的协程在每次调用 suspendHere() 时都会自行挂起。它将继续回调写入 continuation 属性,然后您明确地使用该继续来恢复协程。

该代码使用 Unconfined 协程调度程序,它根本不调度到线程,它只是 运行 调用 continuation.resume().[=49 的协程代码=]


考虑到这一点,让我们重新审视一下您的图表:

GlobalScope.launch {       <---- (A)
    val y = loadData()     <---- (B)  // suspend fun loadData() 
    println(y)             <---- (C)
    delay(1000)            <---- (D)
    println("completed")   <---- (E)
}
  1. Kotlin has a pre-defined ThreadPool at the beginning.

它可能有也可能没有线程池。 UI 调度程序使用单个线程。

线程成为协程调度程序目标的先决条件是有一个与之关联的并发队列,并且线程运行是一个顶级循环,从中获取Runnable个对象这个队列并执行它们。协程调度程序只是将延续放在该队列上。

  1. At (A), Kotlin starts executing the coroutine in the next available free thread (Say Thread01).

它也可以是您调用 launch 的同一个线程。

  1. At (B), Kotlin stops executing the current thread, and starts the suspending function loadData() in the next available free thread (Thread02).

Kotlin 无需停止任何线程即可挂起协程。事实上,协程的要点是线程不会启动或停止。该线程的顶级循环将继续并选择另一个 运行nable to 运行.

此外,您调用 suspend fun 这一事实没有任何意义。协程只会在显式调用 suspendCoroutine 时自行挂起。该功能也可以简单地return而不暂停。

但我们假设它确实调用了 suspendCoroutine。在那种情况下,协程不再 运行 宁 在任何线程 上。它被暂停并且无法继续,直到某处的某些代码调用 continuation.resume()。该代码可以 运行ning 在任何线程上,在未来的任何时间。

  1. When (B) returns after execution, Kotlin continues the coroutine in the next available free thread (Thread03).

B 不会 "return after execution",协程仍在其主体内恢复。在 returning 之前,它可以暂停和恢复任意次数。

  1. (C) executes on Thread03.
  2. At (D), the Thread03 is stopped.
  3. After 1000ms, (E) is executed on the next free thread, say Thread01.

同样,没有线程被停止。协程被挂起,并使用一种通常特定于调度程序的机制来安排其在 1000 毫秒后恢复。届时它将被添加到与调度程序关联的 运行 队列中。


为了具体起见,让我们看一些示例,说明调度协程需要什么样的代码。

Swing UI 调度员:

EventQueue.invokeLater { continuation.resume(value) }

Android UI 调度员:

mainHandler.post { continuation.resume(value) }

ExecutorService 调度程序:

executor.submit { continuation.resume(value) } 

协同程序通过在可能的恢复点上创建一个开关来工作:

class MyClass$Coroutine extends CoroutineImpl {
    public Object doResume(Object o, Throwable t) {
        switch(super.state) {
        default:
                throw new IllegalStateException("call to \"resume\" before \"invoke\" with coroutine");
        case 0:  {
             // code before first suspension
             state = 1; // or something else depending on your branching
             break;
        }
        case 1: {
            ...
        }
        }
        return null;
    }
}

执行此协程的结果代码随后会创建该实例并在每次需要恢复执行时调用 doResume() 函数,处理方式取决于用于执行的调度程序。

下面是一个简单协程的编译示例:

launch {
    println("Before")
    delay(1000)
    println("After")
}

编译成这个字节码

private kotlinx.coroutines.experimental.CoroutineScope p$;

public final java.lang.Object doResume(java.lang.Object, java.lang.Throwable);
Code:
   0: invokestatic  #18                 // Method kotlin/coroutines/experimental/intrinsics/IntrinsicsKt.getCOROUTINE_SUSPENDED:()Ljava/lang/Object;
   3: astore        5
   5: aload_0
   6: getfield      #22                 // Field kotlin/coroutines/experimental/jvm/internal/CoroutineImpl.label:I
   9: tableswitch   { // 0 to 1
                 0: 32
                 1: 77
           default: 102
      }
  32: aload_2
  33: dup
  34: ifnull        38
  37: athrow
  38: pop
  39: aload_0
  40: getfield      #24                 // Field p$:Lkotlinx/coroutines/experimental/CoroutineScope;
  43: astore_3
  44: ldc           #26                 // String Before
  46: astore        4
  48: getstatic     #32                 // Field java/lang/System.out:Ljava/io/PrintStream;
  51: aload         4
  53: invokevirtual #38                 // Method java/io/PrintStream.println:(Ljava/lang/Object;)V
  56: sipush        1000
  59: aload_0
  60: aload_0
  61: iconst_1
  62: putfield      #22                 // Field kotlin/coroutines/experimental/jvm/internal/CoroutineImpl.label:I
  65: invokestatic  #44                 // Method kotlinx/coroutines/experimental/DelayKt.delay:(ILkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
  68: dup
  69: aload         5
  71: if_acmpne     85
  74: aload         5
  76: areturn
  77: aload_2
  78: dup
  79: ifnull        83
  82: athrow
  83: pop
  84: aload_1
  85: pop
  86: ldc           #46                 // String After
  88: astore        4
  90: getstatic     #32                 // Field java/lang/System.out:Ljava/io/PrintStream;
  93: aload         4
  95: invokevirtual #38                 // Method java/io/PrintStream.println:(Ljava/lang/Object;)V
  98: getstatic     #52                 // Field kotlin/Unit.INSTANCE:Lkotlin/Unit;
 101: areturn
 102: new           #54                 // class java/lang/IllegalStateException
 105: dup
 106: ldc           #56                 // String call to \'resume\' before \'invoke\' with coroutine
 108: invokespecial #60                 // Method java/lang/IllegalStateException."<init>":(Ljava/lang/String;)V
 111: athrow

我用 kotlinc 1.2.41 编译了这个

32到76是打印Before调用delay(1000)挂起的代码

从77到101是打印代码After

从 102 到 111 是非法恢复状态的错误处理,如开关 table 中的 default 标签所示。

总而言之,kotlin 中的协程只是由一些调度程序控制的状态机。