Kotlin:如何在没有 runBlocking 的情况下等待非挂起的协程?

Kotlin: How to wait for a coroutine from non-suspend without runBlocking?

编辑 2: 我想我误解了文档。我读:

runBlocking

This function should not be used from a coroutine. It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in main functions and in tests.

意思是我根本不应该使用 runBlocking() 除了 main 或测试。但我现在意识到我读错了,特别是这部分:

It is designed to bridge regular blocking code to libraries that are written in suspending style

所以看来runBlocking应该用在这种场景下

但是我认为我应该充分理解上下文的主题,看看在这种情况下哪些上下文最适合传递给 runBlocking:

return runBlocking(???????) {
    job1.await() + job2.await()
}

编辑: 显然我对问题的措辞很糟糕,因为所有试图回答它的尝试都错过了实际问题和我提出的限制。因此,让我们尝试一种不同的方法...

这个有效:

fun doSomething(): Int {
    val job1 = GlobalScope.async { calculateSomething() }
    val job2 = GlobalScope.async { calculateSomething() }
    return runBlocking {
        job1.await() + job2.await()
    }
}

suspend fun calculateSomething(): Int {
    delay(1000L)
    return 13
}

suspend fun calculateSomethingElse(): Int {
    delay(2000L)
    return 19
}

我的问题是:无论如何我可以达到相同的结果:

  1. 完全不使用runBlocking()
  2. 没有doSomething()变成suspend函数。

?

换句话说:有什么我可以代替 ?????? 来完成以下工作吗?

fun doSomething(): Int {
    val job1 = GlobalScope.async { calculateSomething() }
    val job2 = GlobalScope.async { calculateSomething() }
    return ????????? {
        job1.?????() + job2.?????()
    }
}

suspend fun calculateSomething(): Int {
    delay(1000L)
    return 13
}

suspend fun calculateSomethingElse(): Int {
    delay(2000L)
    return 19
}

我有一个小的实用程序方法可以运行任何给定的外部命令和 returns 它的输出(即围绕 Java 进程 API 的小包装):

class RunningCommand(private val proc: Process) {

    fun waitFor(): String {
        proc.outputStream.close()

        var output = ""
        val outputRdr = thread { output = proc.inputStream.bufferedReader().use { it.readText() } }
        var error = ""
        val errorRdr = thread { error = proc.errorStream.bufferedReader().use { it.readText() } }

        proc.waitFor()

        outputRdr.join()
        errorRdr.join()

        if (proc.exitValue() != 0) {
            throw RuntimeException("Command returned non-zero status: $output$error")
        }

        return output
    }
}

这段代码工作正常。但是,它会为每个命令执行创建两个额外的线程。我想通过切换到协程来避免这种情况。我能够做到这一点,但我不得不使用 runBlocking:

class RunningCommand(private val proc: Process) {

    fun waitFor(): String {
        proc.outputStream.close()

        var output = ""
        val outputRdr = GlobalScope.async { output = proc.inputStream.bufferedReader().use { it.readText() } }
        var error = ""
        val errorRdr = GlobalScope.async { error = proc.errorStream.bufferedReader().use { it.readText() } }

        proc.waitFor()

        runBlocking {
            outputRdr.await()
            errorRdr.await()
        }

        if (proc.exitValue() != 0) {
            throw RuntimeException("Command returned non-zero status: $output${error}")
        }

        return output
    }
}

这段代码也有效,但我读到 runBlocking 应该只用于 main() 方法和测试,即不打算以这种方式使用。偷看它的实现,它看起来很糟糕,而且确实看起来像是人们不想从某些实用程序方法中重复调用的东西。

所以我的问题是:我还应该如何弥合阻塞代码和协程之间的差距?或者换句话说,从非 suspend 代码等待 suspend 函数的正确方法是什么?

或者仅仅是因为我的设计是错误的,为了在任何地方使用协同程序,我需要制作 main() 方法 runBlocking 并且基本上总是在某个协同程序范围内?

您可以使用在后台执行操作的调度程序创建您自己的范围。如果你想等待某件事完全完成执行,你可以使用 withContext。

private val myScope = CoroutineScope(Dispatchers.Main) 

myScope.launch {

   withContext(Dispatchers.IO) {
        //to stuff in the background
    }

}

你 运行 下面的代码,你会看到它打印 20,而不是 null。

fun main() {
    callSuspendFun()
}

suspend fun doWorkAndReturnDouble(num: Int): Int {
    delay(1000)
    return num * 2
}

fun callSuspendFun() {
    val coroutineScope = CoroutineScope(Dispatchers.Main)
    coroutineScope.launch {
        var num: Int? = null
        withContext(Dispatchers.IO) {
            val newNum = 10
            num = doWorkAndReturnDouble(newNum)
        }
        println(num)
    }
}

因此,要在不使用 运行Blocking 的情况下从非挂起函数调用挂起函数,您必须创建协程作用域。而withContext你等待代码的执行。

你应该使用coroutineScope

suspend fun waitFor(): String = coroutineScope {
    proc.outputStream.close()

    var output = ""
    val outputRdr = async { output = proc.inputStream.bufferedReader().use { it.readText() } }
    var error = ""
    val errorRdr = async { error = proc.errorStream.bufferedReader().use { it.readText() } }

    proc.waitFor()

        outputRdr.await()
        errorRdr.await()

    if (proc.exitValue() != 0) {
        throw RuntimeException("Command returned non-zero status: $output${error}")
    }

    return output
}

您可以将 CoroutineScope()Dispathers.IO 结合使用,这将在后台线程中启动协程并卸载您在该线程上的执行

class RunningCommand(private val proc: Process) {

fun waitFor(): String {
    // Starting a new coroutine on Background thread (IO)
    proc.outputStream.close()
    var output = ""
    CoroutineScope(Dispatchers.Unconfined).async {

        val outputRdr = async { output = proc.inputStream.bufferedReader().use { it.readText() } }
        var error = ""
        val errorRdr = async { error = proc.errorStream.bufferedReader().use { it.readText() } }

        proc.waitFor()

        // No need of runBlocking 
        // await() call would wait for completion of its async block
        outputRdr.await() // Execution would block until outputRdr block completion on IO thread
        // At this stage, outputRdr block is completed
        errorRdr.await() // Execution would block until errorRdr block completion on IO thread
        // At this stage, errorRdr block is completed


        if (proc.exitValue() != 0) {
            throw RuntimeException("Command returned non-zero status: $output${error}")
        }
    return@async output
    }
    return output
}

}

注意:如果您从任何协程上下文调用 waitFor() 方法,您可以通过编写 coroutineScope { } 而不是 CoroutineScope(Dispatchers.IO).launch { } 来在同一个协程上下文上继续工作,管理结构化并发适当地。

对于以后和我犯同样错误的旅行者 - runBlocking 不仅可以在 main / 测试中使用 - 而且还可以:

It is designed to bridge regular blocking code to libraries that are written in suspending style

不知何故,我的印象是仅用于某些库函数是邪恶的,但事实并非如此。