如何使用改造阻塞客户端和协程实现有限调用

how to implement a limited call with retrofit blocking client and coroutines

我有以下代码:

val context = newFixedThreadPoolContext(nThreads = 10, name="myThreadPool")
val total = 1_000_000 //can be other number as well
val maxLimit = 1_000
return runBlocking {
  (0..total step maxLimit).map {
    async(context) {
      val offset = it
      val limit = it + maxLimit
      blockingHttpCall(offset, limit)
    }
  }.flatMap {
    it.await()
  }.associateBy {
    ...
  }.toMutableMap()
}

我希望只有 10 个调用同时发生在阻塞 api 上。 但是,上面的代码似乎并没有像我预期的那样执行(我认为所有调用都会立即开始),或者至少我不明白它是否会执行。
实施它的正确方法是什么? 如果我使用改造的异步 api,相同的解决方案是否有效?

我不完全了解你的情况,但最简单的方法 - 使用 OkHttp API 配置并发级别,例如,这是 default concurrency strategy of OkHttp

但是如果您将自己的 Dispatcher 实例设置为 OkHttpClient.Builder

,您可以拥有自己的策略

当然你也可以使用协程

您当前的实现是不正确的,因为您为每个项目创建了协程调度程序,但是要共享线程池,所有协程都应该使用相同的调度程序,只需将 newFixedThreadPoolContext 创建移动到循环之外(现在您有 1000 个调度程序,每个调度程序有 10 个线程)。

但我不建议你使用协程+阻塞调用,最好配置 OkHttp 并发(它更灵活)并使用非阻塞调用的协程(你可以编写自己的适配器或使用现有的库,如 kotlin-coroutines-retrofit).它将允许您混合使用 http 请求和 UI 代码或其他任务。

所以如果你使用非阻塞API + OkHttp内部并发,你不需要有特殊的代码来控制并发,当然你可以像你的例子那样限制并发调用数上面(使用固定的调度程序构造),但我真的不认为它有多大意义,因为你可以降低并发级别,而不是增加它。

转向非阻塞 API 后,您可以 运行 任何协程调度程序中的所有协程并行(即使在 UI 线程中)并等待结果而不会阻塞。

此外,使用 OkHttpClient 配置隐式控制并发在体系结构方面看起来是更正确的方式(您可以拥有配置 Retrofit + OkHttp 的 DI 代码,并使用预配置的并发策略将其提供给您的客户端代码)。当然,您可以使用其他方法来实现,但我觉得这个更自然。