在 Kotlin Coroutines 中收集多个异步结果,忽略超时异常

Gather multiple async results in Kotlin Coroutines ignoring the exceptions with timeouts

我有一个 Generator class,基本上可以生成一些数据,例如:

interface Generator {
    suspend fun generate(): String?
}

有多种实现方式。其中一些可能会抛出异常,其中一些可能需要很长时间才能生成数据:

class Faulty : Generator {
    override suspend fun generate(): String? {
        println("Faulty")
        throw IllegalArgumentException();
    }
}

class Lingering : Generator {
    override suspend fun generate(): String? {
        println("Lingering")
        delay(Duration.ofHours(1))
        return null
    }
}

但有些实现是值得的

class Good : Generator {
    override suspend fun generate(): String {
        println("Good")
        return "Goooood"
    }
}

我需要做的是收集由预先配置的生成器列表生成的数据,为每个生成器设置超时 generate 并忽略异常(但记录它们):

fun main() = runBlocking {
    val generators = listOf(Faulty(), Lingering(), Good())
    val results = supervisorScope {
        generators
                .map { generator ->
                    async(CoroutineExceptionHandler { context, exception ->
                        println(exception)
                    }) {
                        withTimeoutOrNull(5000) {
                            generator.generate()
                        }
                    }
                }
                .awaitAll()
                .filterNotNull()
    }

    println(results)
}

问题是这段代码 fails 有例外:

Faulty
Lingering
Good
Exception in thread "main" java.lang.IllegalArgumentException
 at Faulty.generate (File.kt:12) 
 at FileKt$main$results.invokeSuspend (File.kt:41) 
 at FileKt$main$results.invoke (File.kt:-1) 

为什么 supervisorScope 没有捕捉到它?我做错了什么?

来自 CoroutineExceptionHandler 的文档:

An optional element in the coroutine context to handle uncaught exceptions.

A coroutine that was created using async always catches all its exceptions and represents them in the resulting Deferred object, so it cannot result in uncaught exceptions.

因此您的 async 作业不会发出未捕获的异常。该异常由稍后发生的 awaitAll() 调用重新抛出。您已将未捕获的异常处理程序仅放在 async 上下文中,因此不会被使用。

此外,子协程无论如何都不会发出未捕获的异常。 Their exceptions are delegated up to their root ancestor.

如上一节受监督协程中的异常所述here,监督范围的子级必须有一个使用处理程序的根协程。

您可以做的是将整个任务包装在一个使用处理程序的 launch 块中。由于某种原因,无法在 runBlocking 上安装处理程序。也许这不算作 root 作业?

fun main() = runBlocking{
    val job = GlobalScope.launch(CoroutineExceptionHandler { context, exception ->
        println(exception)
    }) {
        val generators = listOf(Faulty(), Lingering(), Good())
        val results =
            supervisorScope {
                generators
                    .map { generator ->
                        async {
                            withTimeoutOrNull(5000) {
                                generator.generate()
                            }
                        }
                    }
                    .awaitAll()
                    .filterNotNull()
            }

        println(results)
    }
    job.join()
}

但我认为您引入 CoroutineExceptionHandler 的唯一原因可能是忽略异常。该策略行不通,因为处理程序只处理未捕获的异常,这意味着恢复已经太晚了。那个时候作业已经失败了。您必须将 generate() 调用包装在 async 块中的 try/catch 或 runCatching.