在 Kotlin 中使用扩展函数缓存协程

Cahining coroutines by using extension functions in Kotlin

我想使用 Kotlin 的扩展函数链接 3 个协程。我知道如何用普通函数来做,但不能用扩展函数来管理它。事实上,在第二个协程中,我只能接收到从第一个协程发送的一个数据,仅此而已。该程序可以运行,但我在控制台上得到的只是 Doc: 1st Document。我做错了什么?

fun main(args: Array<String>) = runBlocking {
    produceDocs().docLength().report().consumeEach {
        println(it)
    }
}

private fun CoroutineScope.produceDocs() = produce {
    fun getDocs(): List<String> {
        return listOf("1st Document", "2nd Newer Document")
    }
    while (this.isActive) {
        val docs = getDocs()
        for (doc in docs) {
            send(doc)
        }
        delay(TimeUnit.SECONDS.toMillis(2))
    }
}

private suspend fun ReceiveChannel<String>.docLength(): ReceiveChannel<Int> = coroutineScope {
    val docsChannel: ReceiveChannel<String> = this@docLength

    produce {
        for (doc in docsChannel) {
            println("Doc: $doc") // OK. This works.
            send(doc.count()) // ??? Not sure where this sends data to?
        }
    }
}

private suspend fun ReceiveChannel<Int>.report(): ReceiveChannel<String> = coroutineScope {
    val docLengthChannel: ReceiveChannel<Int> = this@report

    produce {
        for (len in docLengthChannel) {
            println("Length: $len") // !!! Nothing arrived.
            send("Report. Document contains $len characters.")
        }
    }
}

你必须独立地消费每个通道才能使发射通过链,否则第一次发射永远不会被消耗:

private fun CoroutineScope.produceDocs() = produce {
    fun getDocs(): List<String> {
        return listOf("1st Document", "2nd Newer Document")
    }

    while (this.isActive) {
        val docs = getDocs()
        for (doc in docs) {
            send(doc)
        }
        delay(TimeUnit.SECONDS.toMillis(2))
    }
}

private suspend fun ReceiveChannel<String>.docLength() : ReceiveChannel<Int> = CoroutineScope(coroutineContext).produce {
    for (doc in this@docLength) {
        println("Doc: $doc") // OK. This works.
        send(doc.count()) // ??? Not sure where this sends data to?
    }
}

private suspend fun ReceiveChannel<Int>.report(): ReceiveChannel<String> = CoroutineScope(coroutineContext).produce {
    for (len in this@report) {
        println("Length: $len") // !!! Nothing arrived.
        send("Report. Document contains $len characters.")
    }
}

我建议您使用 Flow 做完全相同的事情的更好方法:

private fun produceDocs(): Flow<String> = flow {
    fun getDocs(): List<String> {
        return listOf("1st Document", "2nd Newer Document")
    }

    while (true) {
        val docs = getDocs()
        for (doc in docs) {
            emit(doc)
        }
        delay(TimeUnit.SECONDS.toMillis(2))
    }
}

private fun Flow<String>.docLength(): Flow<Int> = flow {
    collect { doc ->
        println("Doc: $doc")
        emit(doc.count())
    }
}

private fun Flow<Int>.report(): Flow<String> = flow {
    collect { len ->
        println("Length: $len")
        emit("Report. Document contains $len characters.")
    }
}

或者像这样更好:

private fun produceDocs(): Flow<String> = flow {
    fun getDocs(): List<String> {
        return listOf("1st Document", "2nd Newer Document")
    }

    while (true) {
        val docs = getDocs()
        for (doc in docs) {
            emit(doc)
        }
        delay(TimeUnit.SECONDS.toMillis(2))
    }
}

private fun Flow<String>.docLength(): Flow<Int> = transform { doc ->
    println("Doc: $doc")
    emit(doc.count())
}

private fun Flow<Int>.report(): Flow<String> = transform { len ->
    println("Length: $len")
    emit("Report. Document contains $len characters.")
}

然后像这样收集:

produceDocs().docLength().report().collect {
    println(it)
}

或者像这样更好:

produceDocs()
    .map { doc ->
        println("Doc: $doc")
        doc.count()
    }
    .map { len ->
        println("Length: $len")
        "Report. Document contains $len characters."
    }
    .collect {
        println(it)
    }