如何安全地 select 跨越某些可能同时关闭的渠道?
How to safely select across channels where some may get concurrently closed?
虽然 我试图实现一个主线程加入 CommonPool
并行执行多个独立任务的设置(这就是 java.util.streams
的操作方式).
我创建了与 CommonPool
线程一样多的 actor,外加一个主线程通道。演员使用集合点频道:
val resultChannel = Channel<Double>(UNLIMITED)
val poolComputeChannels = (1..commonPool().parallelism).map {
actor<Task>(CommonPool) {
for (task in channel) {
task.execute().also { resultChannel.send(it) }
}
}
}
val mainComputeChannel = Channel<Task>()
val allComputeChannels = poolComputeChannels + mainComputeChannel
这让我可以通过使用 select
表达式为每个任务找到空闲的 actor 来分配负载:
select {
allComputeChannels.forEach { chan ->
chan.onSend(task) {}
}
}
所以我发送所有任务并关闭频道:
launch(CommonPool) {
jobs.forEach { task ->
select {
allComputeChannels.forEach { chan ->
chan.onSend(task) {}
}
}
}
allComputeChannels.forEach { it.close() }
}
现在我要写主线程的代码了。在这里,我决定同时服务 mainComputeChannel
,执行提交给主线程的任务,以及 resultChannel
,将各个结果累加到最后的总和:
return runBlocking {
var completedCount = 0
var sum = 0.0
while (completedCount < NUM_TASKS) {
select<Unit> {
mainComputeChannel.onReceive { task ->
task.execute().also { resultChannel.send(it) }
}
resultChannel.onReceive { result ->
sum += result
completedCount++
}
}
}
resultChannel.close()
sum
}
这会导致 mainComputeChannel
可能从 CommonPool
线程关闭,但 resultChannel
仍需要服务的情况。如果通道关闭,onReceive
将抛出异常,onReceiveOrNull
将立即 select 和 null
。这两种选择都不可接受。如果 mainComputeChannel
已关闭,我也没有找到避免注册的方法。如果我使用 if (!mainComputeChannel.isClosedForReceive)
,注册调用将不是原子的。
这引出了我的问题:对于 select 在某些频道可能被另一个线程关闭而其他线程仍然存在的频道上,什么是好的惯用语?
kotlinx.coroutines
库目前缺少一个原语以方便使用。未完成的提议是为 select
添加 receiveOrClose
函数和 onReceiveOrClosed
子句,这将使编写这样的代码成为可能。
但是,您仍然需要手动跟踪 mainComputeChannel
已关闭的事实,并在关闭时停止选择它。因此,使用提议的 onReceiveOrClosed
子句,您将编写如下内容:
// outside of loop
var mainComputeChannelClosed = false
// inside loop
select<Unit> {
if (!mainComputeChannelClosed) {
mainComputeChannel.onReceiveOrClosed {
if (it.isClosed) mainComputeChannelClosed = true
else { /* do something with it */ }
}
}
// more clauses
}
有关详细信息,请参阅 https://github.com/Kotlin/kotlinx.coroutines/issues/330。
table 上没有进一步简化这种模式的提案。
虽然 CommonPool
并行执行多个独立任务的设置(这就是 java.util.streams
的操作方式).
我创建了与 CommonPool
线程一样多的 actor,外加一个主线程通道。演员使用集合点频道:
val resultChannel = Channel<Double>(UNLIMITED)
val poolComputeChannels = (1..commonPool().parallelism).map {
actor<Task>(CommonPool) {
for (task in channel) {
task.execute().also { resultChannel.send(it) }
}
}
}
val mainComputeChannel = Channel<Task>()
val allComputeChannels = poolComputeChannels + mainComputeChannel
这让我可以通过使用 select
表达式为每个任务找到空闲的 actor 来分配负载:
select {
allComputeChannels.forEach { chan ->
chan.onSend(task) {}
}
}
所以我发送所有任务并关闭频道:
launch(CommonPool) {
jobs.forEach { task ->
select {
allComputeChannels.forEach { chan ->
chan.onSend(task) {}
}
}
}
allComputeChannels.forEach { it.close() }
}
现在我要写主线程的代码了。在这里,我决定同时服务 mainComputeChannel
,执行提交给主线程的任务,以及 resultChannel
,将各个结果累加到最后的总和:
return runBlocking {
var completedCount = 0
var sum = 0.0
while (completedCount < NUM_TASKS) {
select<Unit> {
mainComputeChannel.onReceive { task ->
task.execute().also { resultChannel.send(it) }
}
resultChannel.onReceive { result ->
sum += result
completedCount++
}
}
}
resultChannel.close()
sum
}
这会导致 mainComputeChannel
可能从 CommonPool
线程关闭,但 resultChannel
仍需要服务的情况。如果通道关闭,onReceive
将抛出异常,onReceiveOrNull
将立即 select 和 null
。这两种选择都不可接受。如果 mainComputeChannel
已关闭,我也没有找到避免注册的方法。如果我使用 if (!mainComputeChannel.isClosedForReceive)
,注册调用将不是原子的。
这引出了我的问题:对于 select 在某些频道可能被另一个线程关闭而其他线程仍然存在的频道上,什么是好的惯用语?
kotlinx.coroutines
库目前缺少一个原语以方便使用。未完成的提议是为 select
添加 receiveOrClose
函数和 onReceiveOrClosed
子句,这将使编写这样的代码成为可能。
但是,您仍然需要手动跟踪 mainComputeChannel
已关闭的事实,并在关闭时停止选择它。因此,使用提议的 onReceiveOrClosed
子句,您将编写如下内容:
// outside of loop
var mainComputeChannelClosed = false
// inside loop
select<Unit> {
if (!mainComputeChannelClosed) {
mainComputeChannel.onReceiveOrClosed {
if (it.isClosed) mainComputeChannelClosed = true
else { /* do something with it */ }
}
}
// more clauses
}
有关详细信息,请参阅 https://github.com/Kotlin/kotlinx.coroutines/issues/330。
table 上没有进一步简化这种模式的提案。