fan-out/fan-in——关闭结果通道
Fan-out / fan-in - closing result channel
我正在生产项目,从多个协同例程中消费并推回 resultChannel。生产者在最后一项之后关闭其频道。
代码永远不会完成,因为 resultChannel 永远不会关闭。如何检测并正确完成迭代 hasNext()
return false
?
val inputData = (0..99).map { "Input$it" }
val threads = 10
val bundleProducer = produce<String>(CommonPool, threads) {
inputData.forEach { item ->
send(item)
println("Producing: $item")
}
println("Producing finished")
close()
}
val resultChannel = Channel<String>(threads)
repeat(threads) {
launch(CommonPool) {
bundleProducer.consumeEach {
println("CONSUMING $it")
resultChannel.send("Result ($it)")
}
}
}
val iterator = object : Iterator<String> {
val iterator = resultChannel.iterator()
override fun hasNext() = runBlocking { iterator.hasNext() }
override fun next() = runBlocking { iterator.next() }
}.asSequence()
println("Starting interation...")
val result = iterator.toList()
println("finish: ${result.size}")
您可以 运行 等待消费者完成然后关闭 resultChannel
的协程。
首先,重写启动消费者保存 Job
s 的代码:
val jobs = (1..threads).map {
launch(CommonPool) {
bundleProducer.consumeEach {
println("CONSUMING $it")
resultChannel.send("Result ($it)")
}
}
}
然后 运行 另一个协程在所有 Job
完成后关闭通道:
launch(CommonPool) {
jobs.forEach { it.join() }
resultChannel.close()
}
我正在生产项目,从多个协同例程中消费并推回 resultChannel。生产者在最后一项之后关闭其频道。
代码永远不会完成,因为 resultChannel 永远不会关闭。如何检测并正确完成迭代 hasNext()
return false
?
val inputData = (0..99).map { "Input$it" }
val threads = 10
val bundleProducer = produce<String>(CommonPool, threads) {
inputData.forEach { item ->
send(item)
println("Producing: $item")
}
println("Producing finished")
close()
}
val resultChannel = Channel<String>(threads)
repeat(threads) {
launch(CommonPool) {
bundleProducer.consumeEach {
println("CONSUMING $it")
resultChannel.send("Result ($it)")
}
}
}
val iterator = object : Iterator<String> {
val iterator = resultChannel.iterator()
override fun hasNext() = runBlocking { iterator.hasNext() }
override fun next() = runBlocking { iterator.next() }
}.asSequence()
println("Starting interation...")
val result = iterator.toList()
println("finish: ${result.size}")
您可以 运行 等待消费者完成然后关闭 resultChannel
的协程。
首先,重写启动消费者保存 Job
s 的代码:
val jobs = (1..threads).map {
launch(CommonPool) {
bundleProducer.consumeEach {
println("CONSUMING $it")
resultChannel.send("Result ($it)")
}
}
}
然后 运行 另一个协程在所有 Job
完成后关闭通道:
launch(CommonPool) {
jobs.forEach { it.join() }
resultChannel.close()
}