运行 多个并行协程(具有 return 值)

run several coroutines in parallel (with return value)

我是 kotlin 的新手,我正在尝试 运行 多个并行线程中的 Web 请求

到目前为止我得到了

class HttpClient {
    private val DEFAULT_BASE_URL = "https://someapi"

    fun fetch(endPoint: String, page: Int): String {
        FuelManager.instance.basePath = DEFAULT_BASE_URL
        val (_, response, _) = endPoint.httpGet(listOf("page" to page)).response()
        return String(response.data)
    }

    fun headers(endPoint: String): Headers {
        FuelManager.instance.basePath = DEFAULT_BASE_URL
        val (_, response, _) = endPoint.httpGet(listOf("page" to 1)).response()
        return response.headers
    }
}

和运行的class全过程

class Fetcher(private val page: Int) {
    suspend fun run(): String = coroutineScope {
        async {
            HttpClient().fetch(DEFAULT_ENDPOINT, page)
        }
    }.await()


    companion object {

        private const val DEFAULT_ENDPOINT = "endpoint"

        suspend fun fetchAll(): MutableList<String> {

            val totalThreads = (totalCount() / pageSize()) + 1

            return runBlocking {
                var deck: MutableList<String> = mutableListOf()
                for (i in 1..totalThreads) {
                    deck.add(Fetcher(i).run())
                }

                deck
            }
        }

        private fun pageSize(): Int {
            return HttpClient().headers(DEFAULT_ENDPOINT)["page-size"].first().toInt()
        }

        private fun totalCount(): Int {
            return HttpClient().headers(DEFAULT_ENDPOINT)["total-count"].first().toInt()
        }
    }
}

我想从 Java 镜像 Thread.join()。你能给我一些关于如何改进我的代码以实现这一目标的建议吗?

此外,如果要求不高,您能否建议 book/example 关于这个主题的设置?

提前感谢您的帮助!

几点:

  1. 如果您要在项目中使用协同程序,您将主要希望公开挂起函数而不是阻塞函数。我不使用 Fuel,但我看到它 has a coroutines library 及其阻塞函数的 suspend 函数版本。通常,解包异步结果的挂起函数中包含“await”一词。我不确定 response() 是什么,因为我不使用燃料,但如果我不得不猜测,您可以使用 awaitResponse() 代替,然后使函数成为 suspend 函数。

  2. 与协程无关,但几乎没有理由使用 String 构造函数来包装另一个字符串,因为字符串是不可变的。 (你需要像这样在内存中复制一个字符串的唯一原因可能是你在某种奇怪的集合中使用它,这种集合使用身份比较而不是 `==`` 比较,并且你需要将它视为不同的值。)

  3. 也与协程无关,但您的情况下的 HttpClient 应该是单例 object 因为它没有状态。这样你就不需要在使用它时实例化它,也不必担心在 属性.

    中持有一个引用
  4. 从不 在挂起函数中使用 runBlocking。挂起函数绝不能阻塞。 runBlocking 创建一个阻塞函数。 runBlocking 应该出现在应用程序中的唯一两个地方是 CLI 应用程序的顶层 main 函数,或者在同时具有协程和其他一些 thread-management 库的应用程序中需要将挂起函数转换为阻塞 non-suspend 函数,以便它们可以被 non-coroutine-based 代码使用。

  5. 没有理由在 async() 之后立即跟进 await() 如果您没有同时进行其他操作。您可以只使用 withContext 代替。如果您不需要使用特定的调度程序来调用代码(如果它是一个挂起函数则不需要),那么您甚至不需要 withContext。您可以直接在协程中调用挂起函数。

  6. 没有理由使用 coroutineScope { } 来包装单个子协程。它用于 运行 多个子协程并等待所有子协程。

所以,如果我们把HttpClient的函数改成suspend函数,那么Fetcher.run就变得很简单了。

我还认为 Fetcher 是一个带有单个 属性 的 class 有点奇怪,它仅用于 one-off 时尚且具有其唯一功能.相反,如果 Fetcher 是一个单例对象,而 run 拥有它需要的参数,那么 straight-forward 会更好。那么您也不需要伴随对象,因为 Fetcher 作为对象可以直接承载这些功能。

最后,您实际询问的部分:运行 协程中的并行任务,使用 coroutineScope { } 然后在其中启动 async 协程并等待它们。 map 函数可以很方便地使用您可以迭代的东西来执行此操作,然后您可以使用 awaitAll()。您还可以并行获得 totalCountpageSize

综合考虑:

object HttpClient {
    private val DEFAULT_BASE_URL = "https://someapi"

    suspend fun fetch(endPoint: String, page: Int): String {
        FuelManager.instance.basePath = DEFAULT_BASE_URL
        val (_, response, _) = endPoint.httpGet(listOf("page" to page)).awaitResponse()
        return response.data
    }

    suspend fun headers(endPoint: String): Headers {
        FuelManager.instance.basePath = DEFAULT_BASE_URL
        val (_, response, _) = endPoint.httpGet(listOf("page" to 1)).awaitResponse()
        return response.headers
    }
}
object Fetcher() {
    suspend fun run(page: Int): String = 
        HttpClient.fetch(DEFAULT_ENDPOINT, page)

    private const val DEFAULT_ENDPOINT = "endpoint"

    suspend fun fetchAll(): List<String> {
        val totalThreads = coroutineScope {
            val totalCount = async { totalCount() }
            val pageSize = async { pageSize() }
            (totalCount.await() / pageSize.await()) + 1
        }
        return coroutineScope {
            (1..totalThreads).map { i ->
                async { run(i) }
            }.awaitAll()
        }
    }

    private suspend fun pageSize(): Int {
        return HttpClient.headers(DEFAULT_ENDPOINT)["page-size"].first().toInt()
    }

    private suspend fun totalCount(): Int {
        return HttpClient.headers(DEFAULT_ENDPOINT)["total-count"].first().toInt()
    }

}

我将 MutableList 更改为 List,因为它更简单,而且通常不需要 MutableList。如果你真的需要一个,你可以打电话给 toMutableList()