运行 昂贵/UI 并行协程
Run expensive / UI coroutines in parallel
给定以下代码示例:
@ExperimentalCoroutinesApi
fun main() {
val inputChannel = Channel<Unit>()
val outputChannel = Channel<String>()
val engine = Engine(outputChannel)
val calculator = Logger()
runBlocking(Dispatchers.Default) {
launch {
engine.connect(inputChannel)
}
launch {
calculator.connect(outputChannel)
}
inputChannel.send(Unit)
inputChannel.send(Unit)
}
}
class Engine(private val logger: SendChannel<String>) {
private val pool = Executors.newCachedThreadPool().asCoroutineDispatcher()
@ExperimentalCoroutinesApi
suspend fun connect(input: ReceiveChannel<Unit>) {
input.consumeEach {
println("${Instant.now()} [${Thread.currentThread().name}] Engine - Received input")
// What to do here so `calc()` runs in parallel, not blocking the channel?
calc()
}
}
suspend fun calc() {
logger.send("Starting processing")
for (i in 1..100) {
delay(100)
print(".")
}
println()
logger.send("Finished processing")
}
}
class Logger {
@ExperimentalCoroutinesApi
suspend fun connect(channel: ReceiveChannel<String>) {
channel.consumeEach {
println("${Instant.now()} [${Thread.currentThread().name}] Logger - $it")
}
}
}
日志输出显示
2021-11-14T11:50:21.021357700Z [main] Engine - Received input
2021-11-14T11:50:21.025358900Z [main] Logger - Starting processing
....................................................................................................
2021-11-14T11:50:31.829252900Z [main] Engine - Received input
2021-11-14T11:50:31.829252900Z [main] Logger - Finished processing
2021-11-14T11:50:31.829252900Z [main] Logger - Starting processing
....................................................................................................
2021-11-14T11:50:42.686953100Z [main] Logger - Finished processing
所有事情都发生在同一个线程上。但是,我希望 Engine
能够并行计算输入(这只是一个 MWE,而不是需要 运行 并行处理的更大更复杂的应用程序)。
我需要在此代码中更改什么,以便发送到 inputChannel
的消息会产生多个...“范围”(?),其中可以进行计算 和 可以在处理过程中向 outputChannel
发送消息吗?
通过像这样使用 runBlocking()
,您将启动一个单线程调度程序,所有子协程都将在该单线程上调度。如果您需要执行大部分 CPU 密集型计算,请改用 Dispatchers.Default:
runBlocking(Dispatchers.Default) { ... }
然后,如果您需要在这段代码中执行阻塞IO,请将其封装在:
withContext(Dispatchers.IO) { ... }
给定以下代码示例:
@ExperimentalCoroutinesApi
fun main() {
val inputChannel = Channel<Unit>()
val outputChannel = Channel<String>()
val engine = Engine(outputChannel)
val calculator = Logger()
runBlocking(Dispatchers.Default) {
launch {
engine.connect(inputChannel)
}
launch {
calculator.connect(outputChannel)
}
inputChannel.send(Unit)
inputChannel.send(Unit)
}
}
class Engine(private val logger: SendChannel<String>) {
private val pool = Executors.newCachedThreadPool().asCoroutineDispatcher()
@ExperimentalCoroutinesApi
suspend fun connect(input: ReceiveChannel<Unit>) {
input.consumeEach {
println("${Instant.now()} [${Thread.currentThread().name}] Engine - Received input")
// What to do here so `calc()` runs in parallel, not blocking the channel?
calc()
}
}
suspend fun calc() {
logger.send("Starting processing")
for (i in 1..100) {
delay(100)
print(".")
}
println()
logger.send("Finished processing")
}
}
class Logger {
@ExperimentalCoroutinesApi
suspend fun connect(channel: ReceiveChannel<String>) {
channel.consumeEach {
println("${Instant.now()} [${Thread.currentThread().name}] Logger - $it")
}
}
}
日志输出显示
2021-11-14T11:50:21.021357700Z [main] Engine - Received input
2021-11-14T11:50:21.025358900Z [main] Logger - Starting processing
....................................................................................................
2021-11-14T11:50:31.829252900Z [main] Engine - Received input
2021-11-14T11:50:31.829252900Z [main] Logger - Finished processing
2021-11-14T11:50:31.829252900Z [main] Logger - Starting processing
....................................................................................................
2021-11-14T11:50:42.686953100Z [main] Logger - Finished processing
所有事情都发生在同一个线程上。但是,我希望 Engine
能够并行计算输入(这只是一个 MWE,而不是需要 运行 并行处理的更大更复杂的应用程序)。
我需要在此代码中更改什么,以便发送到 inputChannel
的消息会产生多个...“范围”(?),其中可以进行计算 和 可以在处理过程中向 outputChannel
发送消息吗?
通过像这样使用 runBlocking()
,您将启动一个单线程调度程序,所有子协程都将在该单线程上调度。如果您需要执行大部分 CPU 密集型计算,请改用 Dispatchers.Default:
runBlocking(Dispatchers.Default) { ... }
然后,如果您需要在这段代码中执行阻塞IO,请将其封装在:
withContext(Dispatchers.IO) { ... }