【Kotlin】协程技术浅谈

【Kotlin】协程技术浅谈

本文从更全面的角度介绍Kotlin协程的设计思想,其相对线程有哪些更友好的优化

之前写过协程api介绍和核心的挂起恢复原理,再次对其设计思想进行记录,以从更上层的思维模型构筑方面了解 Kotlin 语言的协程。

已有线程为何要使用协程呢

在 JVM 生态系统中,已经有了 Thread 这个设计,对异步计算进行建模的抽象。

但是,JVM 直接映射到 OS 线程的 线程很重 。对于每个线程,OS 必须在堆栈上 分配大量上下文信息 。此外,每次计算达到 阻塞 操作时,底层线程都会暂停,JVM 必须加载另一个线程的上下文。上下文切换成本高昂,因此我们应避免在代码中使用阻塞操作。

JVM 线程上下文(Thread Context)指的是在 JVM 中每个线程所拥有的一组信息,这些信息定义了 线程在运行时的环境和状态 。包含: (1)程序计数器(Program Counter,PC):用于记录线程当前执行的字节码指令地址。当线程被暂停后恢复执行时,程序计数器能让线程知道从哪里继续执行。 (2)栈帧(Stack Frame):线程的栈内存用于存储方法调用的信息,每个方法调用都会在栈上创建一个栈帧。栈帧包含局部变量表、操作数栈、动态链接、方法返回地址等信息。 (3)线程局部存储(Thread Local Storage,TLS):允许线程拥有自己独立的变量副本,不同线程对这些变量的操作互不影响。 (4)寄存器状态:包括 CPU 寄存器的值,如通用寄存器、指令指针寄存器等。这些寄存器的值反映了线程当前的执行状态。 (5)线程优先级:决定了线程在竞争 CPU 资源时的优先顺序。 (6)线程状态:如新建、就绪、运行、阻塞、终止等状态。

另一方面,正如我们将看到的,协程非常轻量级。它们不是直接映射到操作系统线程上,而是在用户级别,使用称为 Continuation 的简单对象。在协程之间切换不需要操作系统加载另一个线程的上下文,而是切换对 Continuation 对象的引用。

采用协程的另一个很好的理由是它们是一种 以同步方式编写异步代码 的方法。

作为替代方案,我们可以使用回调。但是,回调不太优雅,而且不可组合。此外,很难推理它们。很容易陷入回调地狱,代码难以阅读和维护:

a(aInput) { resultFromA ->
  b(resultFromA) { resultFromB ->
    c(resultFromB) { resultFromC ->
      d(resultFromC) { resultFromD ->
        println("A, B, C, D: $resultFromA, $resultFromB, $resultFromC, $resultFromD")
      }
    }
  }
}

上面的例子展示了使用回调风格执行四个函数。我们可以看出,收集四个函数返回的四个值需要很多工作。而且,代码还有很多优化空间,可以变得易于阅读和维护一些。

异步编程中使用的另一种模型是响应式编程。然而,问题在于它需要生成更复杂的代码才能理解和维护。让我们以 RxJava 库官方文档中的以下代码片段为例:

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

上述代码模拟了在后台线程上运行某些计算和网络请求,并在 UI 线程上显示结果(或错误)。它不是自解释的,并不能立即看懂每个方法的作用是什么,我们需要熟悉该库才能理解发生了什么。

协程解决了上述所有问题。让我们看看它是如何解决的。

suspend挂起函数

首先,你可以将协程视为轻量级线程,这意味着它不直接映射到操作系统线程。它是一种可以 随时暂停和恢复 的计算任务。因此,在开始了解如何构建协程之前,我们需要了解如何暂停和恢复协程。

Kotlin 提供了 suspend 关键字来标记可以暂停协程的函数,即允许它暂停并稍后恢复:

suspend fun bathTime() {
  logger.info("Going to the bathroom")
  delay(500L)
  logger.info("Exiting the bathroom")
}

delay(timeMillis: Long) 函数是suspend函数,会暂停协程500ms。

suspend函数只能从协程或其他suspend函数调用 。它可以被暂停和恢复。在上面的例子中,bathTime函数里,当协程执行到了delay函数时,batchTime函数可以被暂停。一旦delay执行完毕,batchTime恢复,其将从暂停后立即执行的行继续执行。

上述机制完全在 Kotlin 运行时中实现,但它是如何实现的呢?无需深入研究协程的内部结构,suspend function的整个上下文保存在类型为 的对象中 Continuation<T> 。T类型变量表示函数的返回类型。

Continuation 包含函数变量和参数的所有状态。此外,它还包括 一个标签 ,用于存储执行暂停的点。因此, Kotlin 编译器将重写每个suspend function ,在函数签名中添加一个 Continuation 类型的参数。我们的函数签名bathTime将被重写如下:

fun bathTime(continuation: Continuation<*>): Any

为什么编译器还要 改变返回值类型 为 Any 呢?答案是,当函数suspend被挂起时,它不能直接返回函数的值。它必须返回一个值来标记该函数被挂起COROUTINE_SUSPENDED,这样调用方才知道自己调用了一个挂起函数,需要在这里暂停自身的执行。

在 continuation 对象内部,编译器将保存函数执行的状态。由于我们没有参数,也没有内部变量,因此 continuation 仅存储标记 执行进度的标签 。为了简单起见,我们引入一个 BathTimeContinuation 类型来存储函数的上下文。

在我们的示例中,运行时可以bathTime在函数开始时或delay函数之后调用该函数。如果我们使用Int标签,则可以表示函数的两种可能状态,如下所示:

fun bathTime(continuation: Continuation<*>): Any {
    val continuation =
      continuation as? BathTimeContinuation ?: BathTimeContinuation(continuation)
    if (continuation.label == 0) {
      logger.info("Going to the bathroom")
      continuation.label = 1
      if (delay(500L, continuation) == COROUTINE_SUSPENDED) 
          return COROUTINE_SUSPENDED
    }
    if (continuation.label == 1) {
      logger.info("Exiting the bathroom")
    }
    error("This line should never be reached")
}

首先,必须检查continuation对象是否是 BathTimeContinuation 类型。如果不是,我们创建一个新BathTimeContinuation对象,并将该continuation对象作为参数传递。

当bathTime第一次调用该函数时,我们会创建一个新的continuation实例。正如我们所见,continuation就像一层层的洋葱:每次调用suspend function时,我们都会将 continuation 对象包装在一个新的 continuation 中。

然后,如果label是0,我们打印第一条消息并将标签设置为1。然后,我们调用该delay函数,传递continuation对象。如果delay函数返回COROUTINE_SUSPENDED,则表示该函数已暂停,我们也返回COROUTINE_SUSPENDED给调用者。

假设delay函数返回的值不同于COROUTINE_SUSPENDED。在这种情况下,这意味着函数已恢复,我们可以继续执行该bathTime函数。如果标签是1,则该函数刚刚恢复,我们打印第二条消息。

以上是 Kotlin 编译器生成并由 Kotlin 运行时运行的实际代码的简化版本。不过,这足以理解协程的工作原理。

简单来说,就是 将需要执行的代码封装在Continuation对象 中,并将其传递给JVM运行时。运行时将检查该Continuation对象是否已暂停。如果是,则运行时将暂停该函数的执行,并将该待执行的Continuation对象传递给该函数,作为一种回调。进入到下一层的挂起函数中,重复这个检查,直接遇到非 COROUTINE_SUSPENDED 状态的返回值,这时候运行时将一层层地恢复该函数的执行。

协程作用域和结构并发

现在我们可以开始研究 Kotlin 如何实现结构并发的概念。让我们声明另一个suspend function,它将模拟煮沸一些水的动作:

suspend fun boilingWater() {
    logger.info("Boiling water")
    delay(1000L)
    logger.info("Water boiled")
}

我们介绍的第一个函数是 coroutineScope 挂起函数。此函数是协程的核心,用于创建新的协程作用域。它以挂起 lambda 作为参数,以 CoroutineScope 的实例作为接收者:

suspend fun <R> coroutineScope(
  block: suspend CoroutineScope.() -> R
): R

协程作用域代表了 Kotlin 中结构化并发的实现。 运行时会阻塞 lambda 的执行,block直到 lambda 内部启动的所有协程block都完成 。这些协程被称为作用域的子协程。此外,结构化并发还为我们带来了以下特性:

  • 子协程继承父协程的上下文 (CoroutineContext),并且可以覆盖它。协程的上下文是Continuation我们之前见过的对象的一部分。它包含协程的名称、调度程序(即执行协程的线程池)、异常处理程序等。
  • 当父协程被取消时,子协程也会被取消。
  • 当子协程抛出异常时,父协程也会停止。

此外,该coroutineScope函数还创建了一个新的协程,它会暂停前一个协程的执行,直到其执行结束。因此,如果我们想按顺序执行晨间例程的两个步骤,我们可以使用以下代码:

suspend fun sequentialMorningRoutine() {
  coroutineScope {
    bathTime()
  }
  // coroutineScope会挂起当前协程,等bathTime走完才会往下执行
  coroutineScope {
    boilingWater()
  }
}

为了执行sequentialMorningRoutine,我们必须声明一个暂停main函数,我们将在本文的其余部分重复使用该函数:

suspend fun main() {
    logger.info("Starting the morning routine")
    sequentialMorningRoutine()
    logger.info("Ending the morning routine")
}

该sequentialMorningRoutine函数将按顺序执行该bathTime函数,然后boilingWater在两个不同的协程中执行该函数。因此,我们不应该对上述代码的输出类似于以下内容感到惊讶:

15:27:05.260 [main] INFO CoroutinesPlayground - Starting the morning routine
15:27:05.286 [main] INFO CoroutinesPlayground - Going to the bathroom
15:27:05.811 [kotlinx.coroutines.DefaultExecutor] INFO CoroutinesPlayground - Exiting the bathroom
15:27:05.826 [kotlinx.coroutines.DefaultExecutor] INFO CoroutinesPlayground - Boiling water
15:27:06.829 [kotlinx.coroutines.DefaultExecutor] INFO CoroutinesPlayground - Water boiled
15:27:06.830 [kotlinx.coroutines.DefaultExecutor] INFO CoroutinesPlayground - Ending the morning routine

我们可以看到,执行是纯顺序的。但是,我们可以看到运行时使用两个不同的线程来执行整个过程,即 mainkotlinx.coroutines.DefaultExecutor 线程。协程的一个重要特性是, 当它们恢复时,它们可以在与暂停它们的线程不同的线程中执行 。例如,bathTime协程在 main 主线程上启动。然后,delay函数将其暂停。最后,它在 kotlinx.coroutines.DefaultExecutor 线程上恢复。

协程构建器

launch Builder

至此,我们应该了解suspend function和结构并发的基础知识。现在是时候明确创建我们的第一个协程了。Kotlin 协程库提供了一组称为 builders 的函数。这些函数用于创建协程并开始执行。我们将看到的第一个函数是launch:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

该库将 launch 构建器定义为 CoroutineScope 类型的扩展函数。因此,我们需要一个作用域来以这种方式创建协程。要创建协程,我们还需要一个CoroutineContext和一个包含要执行的代码的 lambda。构建器将把它作为接收器传递CoroutineScope给blocklambda。这样,我们可以重用作用域来创建新的子协程。最后,构建器的默认行为是立即启动新的协程(CoroutineStart.DEFAULT)。

因此,让我们在早晨的例行工作中添加一些并发功能。我们可以在两个新的协程中启动boilingWater和bathTime函数,并观察它们的竞争情况:

suspend fun concurrentMorningRoutine() {
    coroutineScope {
        launch {
            bathTime()
        }
        launch {
            boilingWater()
        }
    }
}

上述代码的日志类似于以下内容:

09:09:44.817 [main] INFO CoroutinesPlayground - Starting the morning routine
09:09:44.870 [DefaultDispatcher-worker-1 @coroutine#1] INFO CoroutinesPlayground - Going to the bathroom
09:09:44.871 [DefaultDispatcher-worker-2 @coroutine#2] INFO CoroutinesPlayground - Boiling water
09:09:45.380 [DefaultDispatcher-worker-2 @coroutine#1] INFO CoroutinesPlayground - Exiting the bathroom
09:09:45.875 [DefaultDispatcher-worker-2 @coroutine#2] INFO CoroutinesPlayground - Water boiled
09:09:45.876 [DefaultDispatcher-worker-2 @coroutine#2] INFO CoroutinesPlayground - Ending the morning routine

我们可以从上面的日志中提取出很多信息。首先,我们可以看到我们有效地产生了两个新的协程,coroutine#1和coroutine#2。第一个运行bathTime挂起函数,第二个运行boilingWater。

两个函数的日志是交错的,因此两个函数的执行是并发的。这种并发模型是协作的。只有coroutine#1遇到suspend函数,暂停执行时,coroutine#2才有机会执行。

此外,coroutine#1在线程上运行 DefaultDispatcher-worker-1 暂停,而在 DefaultDispatcher-worker-2线程上恢复。协程在可配置的线程池上运行。正如日志所建议的那样,默认线程池被称为Dispatchers.Default。

最后但并非最不重要的一点是,日志显示了结构并发的一个清晰示例。执行main在两个协程执行后打印了方法中的最后一条日志。我们可能已经注意到,我们没有任何显式同步机制来实现main函数中的这一结果。我们没有等待或延迟main函数的执行。正如我们所说,这是由于结构并发。该coroutineScope函数创建一个用于创建两个协程的作用域。由于这两个协程是同一作用域的子代,因此它将等到它们两个的执行结束才返回。

我们也可以避免使用结构化并发。在这种情况下,我们需要添加一些等待协程执行结束的操作。我们可以使用GlobalScope对象而不是 coroutineScope 函数。它就像一个空的协程作用域,不强制任何父子关系。因此,我们可以重写晨间例程函数,如下所示:

suspend fun noStructuralConcurrencyMorningRoutine() {
    GlobalScope.launch {
        bathTime()
    }
    GlobalScope.launch {
        boilingWater()
    }
    Thread.sleep(1500L)
}

上述代码的日志与前一个代码大体相同:

14:06:57.670 [main] INFO CoroutinesPlayground - Starting the morning routine
14:06:57.755 [DefaultDispatcher-worker-2 @coroutine#2] INFO CoroutinesPlayground - Boiling water
14:06:57.755 [DefaultDispatcher-worker-1 @coroutine#1] INFO CoroutinesPlayground - Going to the bathroom
14:06:58.264 [DefaultDispatcher-worker-1 @coroutine#1] INFO CoroutinesPlayground - Exiting the bathroom
14:06:58.763 [DefaultDispatcher-worker-1 @coroutine#2] INFO CoroutinesPlayground - Water boiled
14:06:59.257 [main] INFO CoroutinesPlayground - Ending the morning routine

由于我们没有使用任何结构化的并发机制GlobalScope,我们Thread.sleep(1500L)在函数末尾添加了一个,以等待两个协程的执行结束。如果我们删除该Thread.sleep调用,日志将类似于以下内容:

21:47:09.418 [main] INFO CoroutinesPlayground - Starting the morning routine
21:47:09.506 [main] INFO CoroutinesPlayground - Ending the morning routine

正如预期的那样,主函数在两个协程执行结束之前就返回了。因此,我们可以说,GlobalScope不是创建协程的好选择

如果我们查看该 launch 函数的定义,我们可以看到它返回一个Job对象。该对象是 coroutine 的句柄。我们可以使用它来取消协程的执行或等待其完成。让我们看看如何使用它来等待协程的完成。让我们为我们的钱包添加一个新的suspend function:

suspend fun preparingCoffee() {
    logger.info("Preparing coffee")
    delay(500L)
    logger.info("Coffee prepared")
}

在我们的早晨例行工作中,我们只想在洗澡和烧水后准备咖啡。因此,我们需要等待两个协程的完成。我们可以通过join在结果Job对象上调用方法来做到这一点,join方法是一个suspend函数,可以用于等待协程的block完全执行完毕,代码如下:

suspend fun morningRoutineWithCoffee() {
    coroutineScope {
        val bathTimeJob: Job = launch {
            bathTime()
        }
        val  boilingWaterJob: Job = launch {
            boilingWater()
        }
        bathTimeJob.join()
        boilingWaterJob.join()
        launch {
            preparingCoffee()
        }
    }
}

正如我们所料,从日志中我们可以看到,在两个协程执行结束后,我们才准备了咖啡:

21:56:18.040 [main] INFO CoroutinesPlayground - Starting the morning routine
21:56:18.128 [DefaultDispatcher-worker-1 @coroutine#1] INFO CoroutinesPlayground - Going to the bathroom
21:56:18.130 [DefaultDispatcher-worker-2 @coroutine#2] INFO CoroutinesPlayground - Boiling water
21:56:18.639 [DefaultDispatcher-worker-1 @coroutine#1] INFO CoroutinesPlayground - Exiting the bathroom
21:56:19.136 [DefaultDispatcher-worker-1 @coroutine#2] INFO CoroutinesPlayground - Water boiled
21:56:19.234 [DefaultDispatcher-worker-2 @coroutine#3] INFO CoroutinesPlayground - Preparing coffee
21:56:19.739 [DefaultDispatcher-worker-2 @coroutine#3] INFO CoroutinesPlayground - Coffee prepared
21:56:19.739 [DefaultDispatcher-worker-2 @coroutine#3] INFO CoroutinesPlayground - Ending the morning routine

但是,既然我们现在知道了结构并发的所有秘密,我们可以 使用coroutineScope函数 的功能重写上述代码:

suspend fun structuralConcurrentMorningRoutineWithCoffee() {
    coroutineScope {
        coroutineScope {
            launch {
                bathTime()
            }
            launch {
                boilingWater()
            }
        }
        launch {
            preparingCoffee()
        }
    }
}

上述代码的输出和前一个代码相同。

async Builder

如果我们想从协程的执行中返回一个值怎么办?例如,让我们定义两个新的挂起函数:前者产生我们准备的咖啡混合物。同时,后者返回烤面包:

suspend fun preparingJavaCoffee(): String {
  logger.info("Preparing coffee")
  delay(500L)
  logger.info("Coffee prepared")
  return "Java coffee"
}

suspend fun toastingBread(): String {
  logger.info("Toasting bread")
  delay(1000L)
  logger.info("Bread toasted")
  return "Toasted bread"
}

幸运的是,库提供了一种让协程返回值的方法。我们可以使用 async构建器 创建一个返回值的协程。具体来说,它会产生一个 Deferred<T> 类型的值,其行为或多或少类似于 java Future<T> 。在 Deferred<T> 类型的对象上,我们可以调用 await方法 等待协程完成并获取返回值。库还将async构建器定义为 CoroutineScope 扩展方法:

public fun <T> CoroutineScope.async(
  context: CoroutineContext = EmptyCoroutineContext,
  start: CoroutineStart = CoroutineStart.DEFAULT,
  block: suspend CoroutineScope.() -> T
): Deferred<T>

让我们看看如何使用它来返回我们准备的咖啡和烤面包的混合:

suspend fun breakfastPreparation() {
    coroutineScope {
        val coffee: Deferred<String> = async {
            preparingJavaCoffee()
        }
        val toast: Deferred<String> = async {
            toastingBread()
        }
        logger.info("I'm eating ${coffee.await()} and ${toast.await()}")
    }
}

如果我们查看日志,我们可以看到两个协程的执行仍然是并发的。最后一条日志等待两个协程完成后打印,最终消息:

21:56:46.091 [main] INFO CoroutinesPlayground - Starting the morning routine
21:56:46.253 [DefaultDispatcher-worker-1 @coroutine#1] INFO CoroutinesPlayground - Preparing coffee
21:56:46.258 [DefaultDispatcher-worker-2 @coroutine#2] INFO CoroutinesPlayground - Toasting bread
21:56:46.758 [DefaultDispatcher-worker-1 @coroutine#1] INFO CoroutinesPlayground - Coffee prepared
21:56:47.263 [DefaultDispatcher-worker-1 @coroutine#2] INFO CoroutinesPlayground - Bread toasted
21:56:47.263 [DefaultDispatcher-worker-1 @coroutine#2] INFO CoroutinesPlayground - I'm eating Java coffee and Toasted bread
21:56:47.263 [DefaultDispatcher-worker-1 @coroutine#2] INFO CoroutinesPlayground - Ending the morning routine

协作调度

到这里,我们应该对协程的一些基础知识有所了解了。然而,我们仍然需要讨论协程的一个重要方面:协作调度。

协程调度模型与 Java 采用的Threads抢占式调度模型有很大不同。在抢占式调度中,操作系统决定何时从一个线程切换到另一个线程。在协作式调度中, 协程本身决定何时将控制权交给另一个协程

在 Kotlin 中,协程决定放弃控制权并到达挂起函数。只有此时执行它的线程才会被释放并允许运行另一个协程。

如果我们注意到,在迄今为止看到的日志中,执行控制在调用delay挂起函数时总是会发生变化。但是,为了更好地理解它,让我们看另一个示例。让我们定义一个新的挂起函数来模拟执行一个非常长时间运行的任务:

suspend fun workingHard() {
    logger.info("Working")
    while (true) {
        // Do nothing
    }
    delay(100L)
    logger.info("Work done")
}

无限循环会阻止函数到达delay挂起函数,因此协程永远不会放弃控制权。现在,我们定义另一个挂起函数与前一个函数并发执行:

suspend fun takeABreak() {
    logger.info("Taking a break")
    delay(1000L)
    logger.info("Break done")
}

最后,让我们将所有内容整合到一个新的挂起函数中,该函数在两个专用协程中运行前两个函数。为了确保我们能看到协作调度的效果,我们将执行协程的线程池限制为单个线程:

@OptIn(ExperimentalCoroutinesApi::class)
suspend fun workingHardRoutine() {
  val dispatcher: CoroutineDispatcher = Dispatchers.Default.limitedParallelism(1)
  coroutineScope {
    launch(dispatcher) {
      workingHard()
    }
    launch(dispatcher) {
      takeABreak()
    }
  }
}

表示CoroutineDispatcher用于执行协程的线程池。该limitedParallelism函数是接口的扩展方法CoroutineDispatcher,用于 将线程池中的线程数限制为给定值 。由于这是一个实验性 API,因此我们需要用@OptIn(ExperimentalCoroutinesApi::class)注释注释该函数以避免编译器警告。

我们在唯一可用的线程上启动了两个协程dispatcher,日志向我们展示了协作调度的效果:

08:46:04.804 [main] INFO CoroutinesPlayground - Starting the morning routine
08:46:04.884 [DefaultDispatcher-worker-2 @coroutine#1] INFO CoroutinesPlayground - Working
-- Running forever --

由于workingHard协程从未到达挂起函数,因此它永远不会交出控制权。然后,takeABreak协程永远不会被执行。相反,如果我们定义一个挂起函数,将控制权交还给调度程序,takeABreak协程将有机会被执行:

suspend fun workingConsciousness() {
    logger.info("Working")
    while (true) {
        delay(100L)
    }
    logger.info("Work done")
}

@OptIn(ExperimentalCoroutinesApi::class)
suspend fun workingConsciousnessRoutine() {
  val dispatcher: CoroutineDispatcher = Dispatchers.Default.limitedParallelism(1)
  coroutineScope {
    launch(dispatcher) {
      workingConsciousness()
    }
    launch(dispatcher) {
      takeABreak()
    }
  }
}

现在,日志显示takeABreak协程 有机会执行 ,即使 workingConsciousness 永远运行,并且我们只有一个线程:

09:02:49.302 [main] INFO CoroutinesPlayground - Starting the morning routine
09:02:49.376 [DefaultDispatcher-worker-1 @coroutine#1] INFO CoroutinesPlayground - Working
09:02:49.382 [DefaultDispatcher-worker-1 @coroutine#2] INFO CoroutinesPlayground - Taking a break
09:02:50.387 [DefaultDispatcher-worker-1 @coroutine#2] INFO CoroutinesPlayground - Break done
-- Running forever --

我们可以使用协程来获取相同的日志workingHard,并向线程池中添加一个线程:

@OptIn(ExperimentalCoroutinesApi::class)
suspend fun workingHardRoutine() {
  val dispatcher: CoroutineDispatcher = Dispatchers.Default.limitedParallelism(2)
  coroutineScope {
    launch(dispatcher) {
      workingHard()
    }
    launch(dispatcher) {
      takeABreak()
    }
  }
}

由于我们有两个线程和两个协程,因此并发度现在为 2。照例,日志证实了该理论:coroutine#1在 上执行DefaultDispatcher-worker-1,coroutine#2在 上执行DefaultDispatcher-worker-2。

13:40:59.864 [main] INFO CoroutinesPlayground - Starting the morning routine
13:40:59.998 [DefaultDispatcher-worker-1 @coroutine#1] INFO CoroutinesPlayground - Working
13:41:00.003 [DefaultDispatcher-worker-2 @coroutine#2] INFO CoroutinesPlayground - Taking a break
13:41:01.010 [DefaultDispatcher-worker-2 @coroutine#2] INFO CoroutinesPlayground - Break done
-- Running forever --

协作式调度迫使我们在设计协程时非常小心。假设一个协程执行了一个阻塞底层线程的操作,比如 JDBC 调用。在这种情况下,它会阻止该线程执行任何其他协程。

因此,该库允许我们针对不同的操作使用不同的调度程序。主要有:

  • Dispatchers.Default是库使用的默认调度程序。它使用线程数等于可用处理器数的线程池。它是 CPU 密集型操作的正确选择。
  • Dispatchers.IO是用于 I/O 操作的调度程序。它使用线程池,线程数等于可用处理器数,或最多 64 个。它是 I/O 操作(例如网络调用或文件操作)的正确选择。
  • 从线程池创建的 Dispatcher:可以CoroutineDispatcher使用线程池来创建我们的实例。我们可以轻松使用接口asCoroutineDispatcher的扩展功能Executor。但是,请注意,当我们不再需要底层线程池时,我们有责任将其关闭:val dispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()

如果我们同时拥有 CPU 密集型部分和阻塞部分,我们必须同时使用Dispatchers.DefaultDispatchers.IO ,并确保在默认调度程序上启动 CPU 密集型协程,在 IO 调度程序上启动阻塞代码。

协程的取消

当我们思考并发编程时,取消始终是一个棘手的话题。终止线程并突然停止任务的执行并不是一个好的做法。在停止任务之前,我们必须释放正在使用的资源,避免泄漏,并使系统处于一致状态。

我们可以想象,Kotlin 允许我们取消协程的执行。该库提供了一种机制来协作取消协程以避免出现问题。该Job类型提供了一个cancel取消协程执行的函数。但是,取消不是立即的,只有当协程到达暂停点时才会发生。该机制与我们在协作调度中看到的机制非常接近。

让我们看一个例子。我们想模拟一下我们在工作期间接到一个重要电话。我们忘记了我们最好的朋友的生日,我们想在商场关门前去买一份礼物:

suspend fun forgettingTheBirthDayRoutine() {
  coroutineScope {
    val workingJob = launch {
      workingConsciousness()
    }
    launch {
      delay(2000L)
      workingJob.cancel()
      workingJob.join()
      logger.info("I forgot the birthday! Let's go to the mall!")
    }
  }
}

此代码片段中发生了很多事情。首先,我们启动了workingConsciousness协程并收集了相应的Job。我们使用了workingConsciousness挂起函数,因为它在无限循环内挂起,并调用该delay函数。

同时,我们启动另一个协程,该协程workingJob在 2 秒后调用 workingJob 的取消函数,并等待其完成。workingJob被取消,但workingConsciousness协程不会立即停止。它继续执行,直到 到达暂停点 ,然后被取消。由于我们想等待取消,我们调用了workingJob的join函数。

日志证实了这一理论。在 coroutine#1 启动后约 2 秒,coroutine#2打印了其日志,并且coroutine#1被取消:

21:36:04.205 [main] INFO CoroutinesPlayground - Starting the morning routine
21:36:04.278 [DefaultDispatcher-worker-1 @coroutine#1] INFO CoroutinesPlayground - Working
21:36:06.390 [DefaultDispatcher-worker-2 @coroutine#2] INFO CoroutinesPlayground - I forgot the birthday! Let's go to the mall!
21:36:06.391 [DefaultDispatcher-worker-2 @coroutine#2] INFO CoroutinesPlayground - Ending the morning routine

cancel和join配合使用的模式非常常见,因此 Kotlin 协程库为我们提供了一个cancelAndJoin结合这两种操作的函数。

正如我们所说,在 Kotlin 中,取消是一种合作行为。 如果协程从不暂停,则根本无法取消 。让我们改用suspend function来更改上述示例workingHard。在这种情况下,该workingHard函数从不暂停,因此我们预计workingJob无法取消:

suspend fun forgettingTheBirthDayRoutineWhileWorkingHard() {
    coroutineScope {
        val workingJob = launch {
            workingHard()
        }
        launch {
            delay(2000L)
            workingJob.cancelAndJoin()
            logger.info("I forgot the birthday! Let's go to the mall!")
        }
    }
}

这次,我们的朋友将不会收到她的礼物。workingJob被取消,但workingHard函数没有停止,因为它从未到达暂停点。 日志再次证实了这一理论:

08:56:10.784 [main] INFO CoroutinesPlayground - Starting the morning routine
08:56:10.849 [DefaultDispatcher-worker-1 @coroutine#1] INFO CoroutinesPlayground - Working
-- Running forever --

在后台,该cancel函数将 设置Job为“正在取消”状态。在第一次到达暂停点时,运行时抛出一个CancellationException,协程最终被取消。这种机制使我们能够安全地清理协程使用的资源。我们可以实施许多策略来清理资源,但首先,我们需要在示例中释放资源。我们可以定义代表我们办公室办公桌的 Desk 类:

class Desk : AutoCloseable {
    init {
        logger.info("Starting to work on the desk")
    }

    override fun close() {
        logger.info("Cleaning the desk")
    }
}

该类Desk实现了AutoCloseable接口。因此,它是协程取消期间释放资源的绝佳选择。由于它实现了AutoCloseable,我们可以使用该use函数在代码块完成时自动关闭资源:

suspend fun forgettingTheBirthDayRoutineAndCleaningTheDesk() {
    val desk = Desk()
    coroutineScope {
        val workingJob = launch {
            desk.use { _ ->
                workingConsciousness()
            }
        }
        launch {
            delay(2000L)
            workingJob.cancelAndJoin()
            logger.info("I forgot the birthday! Let's go to the mall!")
        }
    }
}

use 是 Kotlin 标准库中的一个扩展函数,主要用于自动管理需要关闭的资源(如文件、网络连接等)。它确保资源在使用完毕后被正确关闭,即使发生异常也不会遗漏。

正如预期的那样,在我们搬到商场之前,我们清理了桌子,日志也证实了这一点:

21:38:30.117 [main] INFO CoroutinesPlayground - Starting the morning routine
21:38:30.124 [main] INFO CoroutinesPlayground - Starting to work on the desk
21:38:30.226 [DefaultDispatcher-worker-1 @coroutine#1] INFO CoroutinesPlayground - Working
21:38:32.298 [DefaultDispatcher-worker-2 @coroutine#1] INFO CoroutinesPlayground - Cleaning the desk
21:38:32.298 [DefaultDispatcher-worker-2 @coroutine#2] INFO CoroutinesPlayground - I forgot the birthday! Let's go to the mall!
21:38:32.298 [DefaultDispatcher-worker-2 @coroutine#2] INFO CoroutinesPlayground - Ending the morning routine

我们还可以使用invokeOnCompletion取消上的函数在函数完成workingConsciousness Job后清理桌面:

suspend fun forgettingTheBirthDayRoutineAndCleaningTheDeskOnCompletion() {
  val desk = Desk()
  coroutineScope {
    val workingJob = launch {
      workingConsciousness()
    }
    workingJob.invokeOnCompletion { exception: Throwable? ->
      desk.close()
    }
    launch {
      delay(2000L)
      workingJob.cancelAndJoin()
      logger.info("I forgot the birthday! Let's go to the mall!")
    }
  }
}

我们可以看到,该invokeOnCompletion方法将可空异常作为输入参数。如果Job被取消,则异常为CancellationException。

取消的另一个特性是它会传播到子协程。 当我们取消一个协程时,我们会隐式取消它的所有子协程 。让我们看一个例子。白天,保持水分是必不可少的。我们可以使用 drinkWater 来喝水:

suspend fun drinkWater() {
  while (true) {
    logger.info("Drinking water")
    delay(1000L)
    logger.info("Water drunk")
  }
}

然后,我们可以创建一个协程,并生成两个新的协程,分别用于工作和饮用水。最后,我们可以取消父协程,并且我们期望两个子协程也被取消:

suspend fun forgettingTheBirthDayWhileWorkingAndDrinkingWaterRoutine() {
    coroutineScope {
        val workingJob = launch {
            launch {
                workingConsciousness()
            }
            launch {
                drinkWater()
            }
        }
        launch {
            delay(2000L)
            workingJob.cancelAndJoin()
            logger.info("I forgot the birthday! Let's go to the mall!")
        }
    }
}

正如预期的那样,当我们取消 时workingJob,我们也会取消并停止其子协程。以下是描述情况的日志:

13:18:49.143 [main] INFO CoroutinesPlayground - Starting the morning routine
13:18:49.275 [DefaultDispatcher-worker-2 @coroutine#2] INFO CoroutinesPlayground - Working
13:18:49.285 [DefaultDispatcher-worker-3 @coroutine#3] INFO CoroutinesPlayground - Drinking water
13:18:50.285 [DefaultDispatcher-worker-3 @coroutine#3] INFO CoroutinesPlayground - Water drunk
13:18:50.286 [DefaultDispatcher-worker-3 @coroutine#3] INFO CoroutinesPlayground - Drinking water
13:18:51.288 [DefaultDispatcher-worker-2 @coroutine#3] INFO CoroutinesPlayground - Water drunk
13:18:51.288 [DefaultDispatcher-worker-2 @coroutine#3] INFO CoroutinesPlayground - Drinking water
13:18:51.357 [DefaultDispatcher-worker-2 @coroutine#4] INFO CoroutinesPlayground - I forgot the birthday! Let's go to the mall!
13:18:51.357 [DefaultDispatcher-worker-2 @coroutine#4] INFO CoroutinesPlayground - Ending the morning routine

这就是协程取消的全部内容!

协程上下文

在关于continuation的部分和关于构建器的部分中,我们简要介绍了协程上下文的概念。此外,CoroutineScope保留对协程上下文的引用。你可以想象, 这是一种存储从父级传递给子级的信息的方法 ,以在内部开发结构并发性。

表示协程上下文的类型称为CoroutineContext,它是 Kotlin 核心库的一部分。这是一个有趣的类型,因为它表示元素的集合,但同时,每个元素都是一个集合:

public interface CoroutineContext
// But also
public interface Element : CoroutineContext

CoroutineContext 的实现与 Continuation<T> 类型一起放在 Kotlin 协程库中。在实际实现中,我们有CoroutineName,它代表协程的名称:

val name: CoroutineContext = CoroutineName("Morning Routine")

此外,CoroutineDispatcher和Job类型实现了CoroutineContext接口。我们在上面的日志中看到的标识符是CoroutineId。当我们启用调试模式时,运行时会自动将此上下文添加到每个协程中。

由于 CoroutineContext 其行为类似于集合,因此该库还定义了向上下文添加元素的 + 运算符。因此,创建一个包含许多元素的新上下文非常简单:

val context: CoroutineContext = CoroutineName("Morning Routine") + Dispatchers.Default + Job()

也可以使用以下函数从上下文中删除元素minusKey:

val newContext: CoroutineContext = context.minusKey(CoroutineName)

我们应该记住,我们可以将上下文传递给构建器来更改所创建协程的行为。例如,假设我们想要创建一个使用 Dispatchers.Default 的特定名称的协程。在这种情况下,我们可以按如下方式执行:

suspend fun asynchronousGreeting() {
    coroutineScope {
        launch(CoroutineName("Greeting Coroutine") + Dispatchers.Default) {
            logger.info("Hello Everyone!")
        }
    }
}

我们在main函数内部运行一下,在日志中我们可以看到,这个协程以指定的名称创建,并在调度器中执行Default:

11:56:46.747 [DefaultDispatcher-worker-1 @Greeting Coroutine#1] INFO CoroutinesPlayground - Hello Everyone!

协程上下文也可以表现得像一个映射,因为我们可以使用与我们要检索的元素相对应的类型的名称来搜索和访问它包含的元素:

logger.info("Coroutine name: {}", context[CoroutineName]?.name)

上述代码打印了上下文中存储的协程名称(如果有)。CoroutineName方括号内的既不是类型也不是类。实际上,它引用了Key类的伴生对象,即只是一些 Kotlin 语法糖。

该库还定义了 EmptyCoroutineContext 空的协程上下文,我们可以将其用作“零”元素来创建新的自定义上下文。

因此,上下文是一种在协程之间传递信息的方式。任何父协程都会将其上下文提供给其子协程。子协程将值从父级复制到它们可以覆盖的上下文的新实例。让我们看一个没有覆盖的继承示例:

suspend fun coroutineCtxInheritance() {
    coroutineScope {
        launch(CoroutineName("Greeting Coroutine")) {
            logger.info("Hello everyone from the outer coroutine!")
            launch {
                logger.info("Hello everyone from the inner coroutine!")
            }
            delay(200L)
            logger.info("Hello again from the outer coroutine!")
        }
    }
}

上述代码的日志如下,它突出显示两个协程共享相同的名称:

12:19:12.962 [DefaultDispatcher-worker-1 @Greeting Coroutine#1] INFO CoroutinesPlayground - Hello everyone from the outer coroutine!
12:19:12.963 [DefaultDispatcher-worker-2 @Greeting Coroutine#2] INFO CoroutinesPlayground - Hello everyone from the inner coroutine!
12:19:12.963 [DefaultDispatcher-worker-1 @Greeting Coroutine#1] INFO CoroutinesPlayground - Hello again from the outer coroutine!

正如我们所说的,如果我们愿意,我们可以从子协程覆盖上下文中的值:

suspend fun coroutineCtxOverride() {
    coroutineScope {
        launch(CoroutineName("Greeting Coroutine")) {
            logger.info("Hello everyone from the outer coroutine!")
            launch(CoroutineName("Greeting Inner Coroutine")) {
                logger.info("Hello everyone from the inner coroutine!")
            }
            delay(200L)
            logger.info("Hello again from the outer coroutine!")
        }
    }
}

上面代码的log显示了父协程被覆盖了,但是父上下文中的值还是原来的:

12:22:33.869 [DefaultDispatcher-worker-1 @Greeting Coroutine#1] INFO CoroutinesPlayground - Hello everyone from the outer coroutine!
12:22:33.870 [DefaultDispatcher-worker-2 @Greeting Inner Coroutine#2] INFO CoroutinesPlayground - Hello everyone from the inner coroutine!
12:22:34.077 [DefaultDispatcher-worker-1 @Greeting Coroutine#1] INFO CoroutinesPlayground - Hello again from the outer coroutine!
12:22:34.078 [DefaultDispatcher-worker-1 @Greeting Coroutine#1] INFO CoroutinesPlayground - Ending the morning routine

上下文继承规则的唯一例外是Job上下文实例。每个新协程都会创建自己的Job实例,该实例不会从父级继承。而其他上下文元素(例如CoroutineName或调度程序)则从父级继承。