是否可以使用 Kotlin Flow 进行多个并行调用并接受第一个 returns 的调用?

Is it possible to make several parallel calls and accept the first one that returns using Kotlin Flow?

基本上,我必须使用 OkHttp 并行向各种地址发出网络请求。我只关心第一个成功的结果。我可以在 Kotlin 上使用 Flow 执行此操作吗?

我一直在四处寻找,但我一直在努力并行地向 运行 发送请求,总是按顺序向 运行 发送请求。

该代码基本上采用地址列表,并且 return 应该是唯一有效的地址,或者如果 none 有效则为空。

谢谢。

编辑:我应该提到我计划在 Android 上使用它。我可能可以用 RX 做到这一点,但想学习 Flow。还试图限制我添加到应用程序的库。

编辑:我已将答案标记为正确,但我不是这样做的,但它让我非常接近我的做法,但由于我是 Flow 的新手,我不知道是否我的做法是正确的,尽管我很确定它在我的测试后是有效的。

我有一个函数在找不到时抛出 NoSuchElementException。它调用 searchForIPAsync,这是一个 suspend 函数,完成所有 OkHttp 工作和 returns true|false

@Throws(NoSuchElementException::class)
private suspend fun findWorkingIP(ipsToTest: MutableList<String>): String? = ipsToTest
        .asFlow()
        .flatMapMerge(ipsToTest.size)
        { impl ->
            flow<String?> {
                val res = connectionHelper.searchForIPAsync(getURLToTest(impl))
                if (res) {
                    emit(impl)
                } else {
                }
            }
        }.first()

然后我调用它并捕获异常以防万一找不到任何东西:

        try {
            val ipFound = findWorkingIP(ipsToTest)
            Log.w(TAG, "find: Got something " + ipFound);
            return ipFound
        } catch (ex: NoSuchElementException) {
            Log.w(TAG, "find: not found");
        }

虽然 is a close match to what you need, unfortunately as of Kotlin 1.3.2 the Flow implementation has a bug 中基于流的解决方案打破了它。该错误已经有一个建议的修复,因此应该在 Kotlin 的下一个补丁版本中解决这个问题。与此同时,这里有一个类似的解决方案,它使用 asyncChannel 代替:

suspend fun getShortUrl(urls: List<String>): String = coroutineScope {
    val chan = Channel<String?>()
    urls.forEach { url ->
        launch {
            try {
                fetchUrl(url)
            } catch (e: Exception) {
                null
            }.also { chan.send(it) }
        }
    }
    try {
        (1..urls.size).forEach { _ ->
            chan.receive()?.also { return@coroutineScope it }
        }
        throw Exception("All services failed")
    } finally {
        coroutineContext[Job]!!.cancelChildren()
    }
}