如何在 Kotlin 的公共池下创建单线程协程上下文?
How can I create single-thread coroutine context under Common pool in Kotlin?
短要求:能够创建协程上下文,仅在单线程中执行(例如,没有并行性)。
附加要求:这些任务最好使用现有的 CommonPool(例如线程池)
实际上,kotlin 协同程序有方法 newSingleThreadContext
,它将创建单独的线程并将所有任务调度到其中。但是,这是专用线程,因此约 1000 个这样的上下文将需要大量资源。
因此,我希望上下文具有以下特征:
- 最多可同时执行一个任务
- 此上下文应重用任何其他上下文(例如父上下文)。例如上下文不应包含额外的线程
我发现,没有简单的解决方案来创建这样的上下文。
githuib 上有未解决的问题 - https://github.com/Kotlin/kotlinx.coroutines/issues/261
我想当我找到正确的解决方案时我会更新这个问题。
这是一个解决方案:
当您说 withSerialContext(Dispatchers.Default) {doWork()}
时,它会在默认调度程序线程上执行 doWork()
,但它的所有部分都将像在 runBlocking{} 中一样一次执行一个。请注意,即使它一次是一个线程,也不能保证整个操作都是同一个线程。
suspend fun <T> withSerialContext(
context: CoroutineDispatcher,
block: suspend CoroutineScope.() -> T
): T = withContext(SerialContextDispatcher(context), block)
private class SerialContextDispatcher(private val target: CoroutineDispatcher) : CoroutineDispatcher() {
private val q = ConcurrentLinkedQueue<Runnable>()
//Whoever CASes this false->true schedules execution of runproc
private val pending = AtomicBoolean(false)
//Only one of these runs at a time
private val runproc = object: Runnable {
override fun run() {
while(true) {
val proc = q.poll();
if (proc != null) {
try {
proc.run()
}
catch (e: Throwable) {
target.dispatch(EmptyCoroutineContext, this)
throw e
}
} else {
pending.set(false);
if (q.isEmpty() || !pending.compareAndSet(false, true)) {
return
}
}
}
}
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
q.add(block)
if (pending.compareAndSet(false, true)) {
target.dispatch(EmptyCoroutineContext, runproc)
}
}
}
从 1.6.0
version of the kotlinx.coroutines
library we can use limitedParallelism
function on the CoroutineDispatcher
对象开始,它允许您在不创建额外线程池的情况下限制并行度,并提供一种统一的方法来创建非绑定并行度的调度程序。
用法示例:
class UserRepository {
private val dbDispatcher = Dispatchers.IO.limitedParallelism(1)
suspend fun getUserById(userId: Int): User? = withContext(dbDispatcher) {
executeQuery("SELECT * FROM users WHERE id = ", userId).singleOrNull()
}
}
limitedParallelism(1)
保证并行度限制 - 在此调度程序中最多可以并发执行 1 个协程。
应该可以解决问题:
Maximum one task can be executed at the same time.
短要求:能够创建协程上下文,仅在单线程中执行(例如,没有并行性)。
附加要求:这些任务最好使用现有的 CommonPool(例如线程池)
实际上,kotlin 协同程序有方法 newSingleThreadContext
,它将创建单独的线程并将所有任务调度到其中。但是,这是专用线程,因此约 1000 个这样的上下文将需要大量资源。
因此,我希望上下文具有以下特征:
- 最多可同时执行一个任务
- 此上下文应重用任何其他上下文(例如父上下文)。例如上下文不应包含额外的线程
我发现,没有简单的解决方案来创建这样的上下文。
githuib 上有未解决的问题 - https://github.com/Kotlin/kotlinx.coroutines/issues/261
我想当我找到正确的解决方案时我会更新这个问题。
这是一个解决方案:
当您说 withSerialContext(Dispatchers.Default) {doWork()}
时,它会在默认调度程序线程上执行 doWork()
,但它的所有部分都将像在 runBlocking{} 中一样一次执行一个。请注意,即使它一次是一个线程,也不能保证整个操作都是同一个线程。
suspend fun <T> withSerialContext(
context: CoroutineDispatcher,
block: suspend CoroutineScope.() -> T
): T = withContext(SerialContextDispatcher(context), block)
private class SerialContextDispatcher(private val target: CoroutineDispatcher) : CoroutineDispatcher() {
private val q = ConcurrentLinkedQueue<Runnable>()
//Whoever CASes this false->true schedules execution of runproc
private val pending = AtomicBoolean(false)
//Only one of these runs at a time
private val runproc = object: Runnable {
override fun run() {
while(true) {
val proc = q.poll();
if (proc != null) {
try {
proc.run()
}
catch (e: Throwable) {
target.dispatch(EmptyCoroutineContext, this)
throw e
}
} else {
pending.set(false);
if (q.isEmpty() || !pending.compareAndSet(false, true)) {
return
}
}
}
}
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
q.add(block)
if (pending.compareAndSet(false, true)) {
target.dispatch(EmptyCoroutineContext, runproc)
}
}
}
从 1.6.0
version of the kotlinx.coroutines
library we can use limitedParallelism
function on the CoroutineDispatcher
对象开始,它允许您在不创建额外线程池的情况下限制并行度,并提供一种统一的方法来创建非绑定并行度的调度程序。
用法示例:
class UserRepository {
private val dbDispatcher = Dispatchers.IO.limitedParallelism(1)
suspend fun getUserById(userId: Int): User? = withContext(dbDispatcher) {
executeQuery("SELECT * FROM users WHERE id = ", userId).singleOrNull()
}
}
limitedParallelism(1)
保证并行度限制 - 在此调度程序中最多可以并发执行 1 个协程。
应该可以解决问题:
Maximum one task can be executed at the same time.