如何取消协程中的阻塞代码

How to cancel the blocking code in the coroutines

我的代码结构如下:

 @Throws(InterruptedException::class)
 fun method() {
     // do some blocking operations like Thread.sleep(...)
 }
 var job = launch {
     method()
 }
 job.cancelAndJoin()

method 由外部库提供,我无法控制它的行为。执行可能会花费很多时间,所以在某些情况下应该超时取消。

我可以使用kotlin协程库提供的withTimeout函数,但由于协程设计,它无法取消带有阻塞的代码。有一些解决方法吗?

主要思想是使用协程外上下文线程池,JVM线程可以在旧样式中被中断,并订阅协程执行中的取消事件。当事件被invokeOnCancellation捕获时,我们可以中断当前线程。

实施:

val externalThreadPool = Executors.newCachedThreadPool()
suspend fun <T> withTimeoutOrInterrupt(timeMillis: Long, block: () -> T) {
    withTimeout(timeMillis) {
        suspendCancellableCoroutine<Unit> { cont ->
            val future = externalThreadPool.submit {
                try {
                    block()
                    cont.resumeWith(Result.success(Unit))
                } catch (e: InterruptedException) {
                    cont.resumeWithException(CancellationException())
                } catch (e: Throwable) {
                    cont.resumeWithException(e);
                }
            }
            cont.invokeOnCancellation {
                future.cancel(true)
            }
        }
    }
}

它提供与通常的行为类似的行为 withTimeout,但它还支持 运行 带有阻塞的代码。

注意:只有当您知道内部代码使用阻塞并且可以正确处理抛出的 InterruptedException 时才应调用它。在大多数情况下,首选 withTimeout 函数。

更新: 自协程版本 1.3.7 以来,有一个新函数 runInterruptible,它提供相同的行为。所以这段代码可以简化:

suspend fun <T> withTimeoutOrInterrupt(timeMillis: Long, block: () -> T) {
    withTimeout(timeMillis) {
        runInterruptible(Dispatchers.IO) {
            block()
        }
    }
}