如何安全地 select 跨越某些可能同时关闭的渠道?

How to safely select across channels where some may get concurrently closed?

虽然 我试图实现一个主线程加入 CommonPool 并行执行多个独立任务的设置(这就是 java.util.streams 的操作方式).

我创建了与 CommonPool 线程一样多的 actor,外加一个主线程通道。演员使用集合点频道:

val resultChannel = Channel<Double>(UNLIMITED)
val poolComputeChannels = (1..commonPool().parallelism).map {
    actor<Task>(CommonPool) {
        for (task in channel) {
            task.execute().also { resultChannel.send(it) }
        }
    }
}
val mainComputeChannel = Channel<Task>()
val allComputeChannels = poolComputeChannels + mainComputeChannel

这让我可以通过使用 select 表达式为每个任务找到空闲的 actor 来分配负载:

select {
    allComputeChannels.forEach { chan ->
        chan.onSend(task) {}
    }
}

所以我发送所有任务并关闭频道:

launch(CommonPool) {
    jobs.forEach { task ->
        select {
            allComputeChannels.forEach { chan ->
                chan.onSend(task) {}
            }
        }
    }
    allComputeChannels.forEach { it.close() }
}

现在我要写主线程的代码了。在这里,我决定同时服务 mainComputeChannel,执行提交给主线程的任务,以及 resultChannel,将各个结果累加到最后的总和:

return runBlocking {
    var completedCount = 0
    var sum = 0.0
    while (completedCount < NUM_TASKS) {
        select<Unit> {
            mainComputeChannel.onReceive { task ->
                task.execute().also { resultChannel.send(it) }
            }
            resultChannel.onReceive { result ->
                sum += result
                completedCount++
            }
        }
    }
    resultChannel.close()
    sum
}

这会导致 mainComputeChannel 可能从 CommonPool 线程关闭,但 resultChannel 仍需要服务的情况。如果通道关闭,onReceive 将抛出异常,onReceiveOrNull 将立即 select 和 null。这两种选择都不可接受。如果 mainComputeChannel 已关闭,我也没有找到避免注册的方法。如果我使用 if (!mainComputeChannel.isClosedForReceive),注册调用将不是原子的。

这引出了我的问题:对于 select 在某些频道可能被另一个线程关闭而其他线程仍然存在的频道上,什么是好的惯用语?

kotlinx.coroutines 库目前缺少一个原语以方便使用。未完成的提议是为 select 添加 receiveOrClose 函数和 onReceiveOrClosed 子句,这将使编写这样的代码成为可能。

但是,您仍然需要手动跟踪 mainComputeChannel 已关闭的事实,并在关闭时停止选择它。因此,使用提议的 onReceiveOrClosed 子句,您将编写如下内容:

// outside of loop
var mainComputeChannelClosed = false
// inside loop
select<Unit> {
    if (!mainComputeChannelClosed) {
        mainComputeChannel.onReceiveOrClosed { 
            if (it.isClosed) mainComputeChannelClosed = true 
            else { /* do something with it */ }
        }
    }
    // more clauses
}    

有关详细信息,请参阅 https://github.com/Kotlin/kotlinx.coroutines/issues/330

table 上没有进一步简化这种模式的提案。