运行 昂贵/UI 并行协程

Run expensive / UI coroutines in parallel

给定以下代码示例:

@ExperimentalCoroutinesApi
fun main() {
    val inputChannel = Channel<Unit>()
    val outputChannel = Channel<String>()
    val engine = Engine(outputChannel)
    val calculator = Logger()

    runBlocking(Dispatchers.Default) {
        launch {
            engine.connect(inputChannel)
        }

        launch {
            calculator.connect(outputChannel)
        }

        inputChannel.send(Unit)
        inputChannel.send(Unit)
    }
}

class Engine(private val logger: SendChannel<String>) {

    private val pool = Executors.newCachedThreadPool().asCoroutineDispatcher()

    @ExperimentalCoroutinesApi
    suspend fun connect(input: ReceiveChannel<Unit>) {
        input.consumeEach {
            println("${Instant.now()} [${Thread.currentThread().name}] Engine - Received input")
            // What to do here so `calc()` runs in parallel, not blocking the channel?
            calc()
        }
    }

    suspend fun calc() {
        logger.send("Starting processing")
        for (i in 1..100) {
            delay(100)
            print(".")
        }
        println()
        logger.send("Finished processing")
    }
}

class Logger {

    @ExperimentalCoroutinesApi
    suspend fun connect(channel: ReceiveChannel<String>) {
        channel.consumeEach {
            println("${Instant.now()} [${Thread.currentThread().name}] Logger - $it")
        }
    }
}


日志输出显示

2021-11-14T11:50:21.021357700Z [main] Engine - Received input
2021-11-14T11:50:21.025358900Z [main] Logger - Starting processing
....................................................................................................
2021-11-14T11:50:31.829252900Z [main] Engine - Received input
2021-11-14T11:50:31.829252900Z [main] Logger - Finished processing
2021-11-14T11:50:31.829252900Z [main] Logger - Starting processing
....................................................................................................
2021-11-14T11:50:42.686953100Z [main] Logger - Finished processing

所有事情都发生在同一个线程上。但是,我希望 Engine 能够并行计算输入(这只是一个 MWE,而不是需要 运行 并行处理的更大更复杂的应用程序)。

我需要在此代码中更改什么,以便发送到 inputChannel 的消息会产生多个...“范围”(?),其中可以进行计算 可以在处理过程中向 outputChannel 发送消息吗?

通过像这样使用 runBlocking(),您将启动一个单线程调度程序,所有子协程都将在该单线程上调度。如果您需要执行大部分 CPU 密集型计算,请改用 Dispatchers.Default

runBlocking(Dispatchers.Default) { ... }

然后,如果您需要在这段代码中执行阻塞IO,请将其封装在:

withContext(Dispatchers.IO) { ... }