如何在 Kotlin 的公共池下创建单线程协程上下文?

How can I create single-thread coroutine context under Common pool in Kotlin?

短要求:能够创建协程上下文,仅在单线程中执行(例如,没有并行性)。

附加要求:这些任务最好使用现有的 CommonPool(例如线程池)

实际上,kotlin 协同程序有方法 newSingleThreadContext,它将创建单独的线程并将所有任务调度到其中。但是,这是专用线程,因此约 1000 个这样的上下文将需要大量资源。

因此,我希望上下文具有以下特征:

我发现,没有简单的解决方案来创建这样的上下文。

githuib 上有未解决的问题 - https://github.com/Kotlin/kotlinx.coroutines/issues/261

我想当我找到正确的解决方案时我会更新这个问题。

这是一个解决方案:

当您说 withSerialContext(Dispatchers.Default) {doWork()} 时,它会在默认调度程序线程上执行 doWork(),但它的所有部分都将像在 runBlocking{} 中一样一次执行一个。请注意,即使它一次是一个线程,也不能保证整个操作都是同一个线程。

suspend fun <T> withSerialContext(
        context: CoroutineDispatcher,
        block: suspend CoroutineScope.() -> T
): T = withContext(SerialContextDispatcher(context), block)

private class SerialContextDispatcher(private val target: CoroutineDispatcher) : CoroutineDispatcher() {

    private val q = ConcurrentLinkedQueue<Runnable>()
    //Whoever CASes this false->true schedules execution of runproc
    private val pending = AtomicBoolean(false)
    //Only one of these runs at a time
    private val runproc = object: Runnable {
        override fun run() {
            while(true) {
                val proc = q.poll();
                if (proc != null) {
                    try {
                        proc.run()
                    }
                    catch (e: Throwable) {
                        target.dispatch(EmptyCoroutineContext, this)
                        throw e
                    }
                } else {
                    pending.set(false);
                    if (q.isEmpty() || !pending.compareAndSet(false, true)) {
                        return
                    }
                }
            }
        }
    }

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        q.add(block)
        if (pending.compareAndSet(false, true)) {
            target.dispatch(EmptyCoroutineContext, runproc)
        }
    }
}

1.6.0 version of the kotlinx.coroutines library we can use limitedParallelism function on the CoroutineDispatcher 对象开始,它允许您在不创建额外线程池的情况下限制并行度,并提供一种统一的方法来创建非绑定并行度的调度程序。

用法示例:

class UserRepository {
    private val dbDispatcher = Dispatchers.IO.limitedParallelism(1)

    suspend fun getUserById(userId: Int): User? = withContext(dbDispatcher) {
        executeQuery("SELECT * FROM users WHERE id = ", userId).singleOrNull()
   }
}

limitedParallelism(1) 保证并行度限制 - 在此调度程序中最多可以并发执行 1 个协程。

应该可以解决问题:

Maximum one task can be executed at the same time.