所有生产者协程完成后如何关闭通道?
How to close the channel after all producer coroutines are done?
考虑以下代码:
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
val channel = Channel<String>()
launch {
channel.send("A1")
channel.send("A2")
log("A done")
}
launch {
channel.send("B1")
log("B done")
}
launch {
for (x in channel) {
log(x)
}
}
}
fun log(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}
原始版本有这样的接收协程:
launch {
repeat(3) {
val x = channel.receive()
log(x)
}
}
它预计频道中只有 3 条消息。如果我将它更改为第一个版本,那么我需要在所有生产者协程完成后关闭通道。我该怎么做?
一个可能的解决方案是创建一个等待所有 channel.send()
完成的作业,并在该作业的 invokeOnCompletion
中调用 channel.close()
:
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
val channel = Channel<String>()
launch {
launch {
channel.send("A1")
channel.send("A2")
log("A done")
}
launch {
channel.send("B1")
log("B done")
}
}.invokeOnCompletion {
channel.close()
}
launch {
for (x in channel) {
log(x)
}
}
}
fun log(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}
考虑以下代码:
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
val channel = Channel<String>()
launch {
channel.send("A1")
channel.send("A2")
log("A done")
}
launch {
channel.send("B1")
log("B done")
}
launch {
for (x in channel) {
log(x)
}
}
}
fun log(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}
原始版本有这样的接收协程:
launch {
repeat(3) {
val x = channel.receive()
log(x)
}
}
它预计频道中只有 3 条消息。如果我将它更改为第一个版本,那么我需要在所有生产者协程完成后关闭通道。我该怎么做?
一个可能的解决方案是创建一个等待所有 channel.send()
完成的作业,并在该作业的 invokeOnCompletion
中调用 channel.close()
:
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
val channel = Channel<String>()
launch {
launch {
channel.send("A1")
channel.send("A2")
log("A done")
}
launch {
channel.send("B1")
log("B done")
}
}.invokeOnCompletion {
channel.close()
}
launch {
for (x in channel) {
log(x)
}
}
}
fun log(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}