异步{}内部流程

async{} inside flow

我可以在 kotlin 流中使用 async{} 吗?

场景:在 API 调用之后,我得到了一个包含 200 个我需要解析(转换为 UIObject)对象的列表。 我正在尝试并行处理这个列表。 下面是伪代码:

 fun getUIObjectListFlow():Flow<List<UIObject>> {
    flow<List<UIObject>> {
        while (stream.hasNext()) {
            val data = stream.getData() // reading from an input stream. Data comes in chunk

            val firstHalfDeffered = async(Dispatchers.IO) { /* process first half of the list that data obj contains*/ }
            val secondHalfDeffered = async(Dispatchers.IO) { /*process second half of the list that data obj contains */ }
            val processedList = firstHalfDeffered.await() + secondHalfDeffered.await() // just pseudo code

            emit(processedList)
        }
    }
}

由于 async{} 需要协程作用域(例如:someScope.async{}),我怎样才能在 flow 中获得作用域?有没有其他方法可以做到这一点?

这个函数在存储库中,我正在从视图模型中调用它。

谢谢

(初始问题的原始答案)

正如@broot 在评论中提到的,如果您想要生成单个项目(即使该单个项目是一个集合),则不需要 Flow<T>。 通常,您只需要一个 suspend 函数(或本例中的一段暂停代码),而不是 return 是 Flow.

的函数

现在,无论您是否保留单项流程,您都可以使用 coroutineScope { ... } 挂起函数来定义一个本地范围,您可以从中启动协程。这个函数做了一些事情:

  1. 它提供了一个范围来启动子协程
  2. 它会暂停,直到所有子协程完成
  3. 它return是一个基于块中最后一个表达式的值(lambda 的“return”值)

它可能是这样的:

val uiObjects = coroutineScope { //this: CoroutineScope
    val list = getDataFromServer()
            
    val firstHalf = async(Dispatchers.IO) { /*process first half of the list */ }
    val secondHalf = async(Dispatchers.IO) { /*process second half of the list */ }
            
    // the last expression from the block is what the uiObjects variable gets
    firstHalf.await() + secondHalf.await()
}

编辑:给定问题更新,这里是一些更新的代码。您仍然应该使用 coroutineScope 为您的短期协程创建本地范围:

fun getUIObjectListFlow(): Flow<List<UIObject>> = flow<List<UIObject>> {
    while (stream.hasNext()) {
        val data = stream.getData() // reading from an input stream. Data comes in chunk

        val processedList = coroutineScope {
            val firstHalfDeffered = async(Dispatchers.IO) { /* process first half of the list that data obj contains*/ }
            val secondHalfDeffered = async(Dispatchers.IO) { /*process second half of the list that data obj contains */ }
            firstHalfDeffered.await() + secondHalfDeffered.await() 
        }
        emit(processedList)
    }
}