如何避免在执行 long-运行 计算的 Spring WebFlux 控制器中使用 Kotlin Coroutines 的 GlobalScope

How to avoid using Kotlin Coroutines' GlobalScope in a Spring WebFlux controller that performs long-running computations

我有一个 Rest API,它是使用 Spring WebFlux 和 Kotlin 实现的,其端点用于启动长时间的 运行 计算。由于让调用者等到计算完成并不是很优雅,它应该立即 return 一个 ID,一旦它可用,调用者可以使用它在不同的端点上获取结果。计算在后台开始,只要准备就绪就应该完成 - 我真的不关心它何时完成,因为它是调用者的工作来为它进行轮询。

当我使用 Kotlin 时,我认为解决这个问题的规范方法是使用协程。这是我的实现方式的一个最小示例(使用 Spring's Kotlin DSL 而不是传统控制器):

import org.springframework.web.reactive.function.server.coRouter

// ...

fun route() = coRouter {
    POST("/big-computation") { request: ServerRequest ->
        val params = request.awaitBody<LongRunningComputationParams>()
        val runId = GlobalResultStorage.prepareRun(params);
        coroutineScope {
            launch(Dispatchers.Default) {
                GlobalResultStorage.addResult(runId, longRunningComputation(params))
            }
        }
        ok().bodyValueAndAwait(runId)
    }
}

不过,这并没有达到我的要求,因为外部协程(POST("/big-computation") 之后的块)一直等到其内部协程完成执行,因此只有 returning runId 在一开始就不再需要它之后。

我能找到的唯一可能的方法是使用 GlobalScope.launch,它会生成一个没有等待其结果的父进程的协程,但我到处都读到强烈建议您不要使用它。明确一点,有效的代码如下所示:

POST("/big-computation") { request: ServerRequest ->
    val params = request.awaitBody<LongRunningComputationParams>()
    val runId = GlobalResultStorage.prepareRun(params);
    GlobalScope.launch {
        GlobalResultStorage.addResult(runId, longRunningComputation(params))
    }
    ok().bodyValueAndAwait(runId)
}

我是否遗漏了一些非常明显的东西,这些东西会使我的示例使用适当的结构化并发工作,或者这真的是 GlobalScope 的合法用例吗?有没有办法在一个不附加到它的启动范围的范围内启动长 运行 计算的协程?我能想到的唯一想法是从同一个 coroutineScope 启动计算和请求处理程序,但是因为计算取决于请求处理程序,所以我不明白这怎么可能。

提前致谢!

也许其他人不会同意我的看法,但我认为这种对 GlobalScope 的厌恶有点夸张了。我经常有这样的印象,有些人并不真正理解 GlobalScope 的问题所在,他们用具有相似缺点或实际上相同的解决方案代替它。但是,至少他们不再使用邪恶 GlobalScope...

不要误会我的意思:GlobalScope 不好。特别是因为它太容易使用了,所以很容易过度使用它。但是也有很多情况我们并不真正关心它的缺点。

结构化并发的主要目标是:

  • 自动等待子任务,这样我们就不会在子任务完成之前不小心继续。
  • 取消个别作业。
  • 安排后台任务的
  • Cancelling/shutting service/component。
  • 在异步任务之间传播失败。

这些功能对于提供可靠的并发应用程序至关重要,但在很多情况下 none 确实很重要。让我们举个例子:如果您的请求处理程序在应用程序的整个时间内都在工作,那么您不需要既等待子任务又关闭功能。您不想传播失败。取消单个子任务在这里并不适用,因为无论我们使用 GlobalScope 还是“适当的”解决方案,我们都完全相同 - 通过将任务的 Job 存储在某处。

因此,我想说 GlobalScope 不被鼓励的主要原因,不适用于您的情况。

话虽如此,我仍然认为实施通常建议作为 GlobalScope 的适当替代方案的解决方案可能是值得的。只需使用您自己的 CoroutineScope 创建一个 属性 并使用它来启动协程:

private val scope = CoroutineScope(Dispatchers.Default)

fun route() = coRouter {
    POST("/big-computation") { request: ServerRequest ->
        ...
        scope.launch {
            GlobalResultStorage.addResult(runId, longRunningComputation(params))
        }
        ...
    }
}

你不会从中得到太多。它不会帮助您解决资源泄漏问题,也不会使您的代码更可靠或其他什么。但至少它将有助于以某种方式对后台任务进行分类。技术上可以确定谁是后台任务的所有者。您可以轻松地在一个地方配置所有后台任务,例如提供 CoroutineName 或切换到另一个线程池。您可以计算当前有多少活动子任务。如果需要,添加优雅关机会更容易。等等。

但最重要的是:实施起来很便宜。你不会得到太多,但也不会花费你太多,所以为什么不呢。