RxJava:网络调用的线程池

RxJava: Thread pool for network calls

我正在尝试创建一种机制来限制并发网络请求的数量。我的想法是我想要一个固定的线程池,比如 20 个线程,并使用该池最多只允许 20 个传出 HTTP 请求。

我一直在努力做的是:

public class HttpClient {
  private final Scheduler scheduler;

  public HttpClient(int maxRequests) {
    this.scheduler = Schedulers.from(Executors.newFixedThreadPool(maxRequests));
  }

  public Single<...> request() {
    return this.httpRequest()
      .subscribeOn(this.scheduler);
  }

  // sends the http request and returns a response
  private Single<...> httpRequest() {
    return ...
  }
}

但这不起作用。我已经尝试将 maxRequests 设置为 1,发送 5 个请求,然后在接收请求的服务器上设置断点,以便将第一个请求 "stuck" 保留在那里,以便查看如果其他 4 个等待可用线程。但是他们 5 个都执行了,过了一会儿,我在所有 5 个请求上都得到了超时异常。

我也试过使用observeOn,但也没用。

编辑:我还尝试使用以下代码实现信号量逻辑:


public HttpClient(int maxRequests) {
  this.concurrentRequestsSemaphore = new Semaphore(maxRequests, true);
}

public Single<...> request() {
  return Completable.fromAction(concurrentRequestsSemaphore::acquireUninterruptibly)
   .andThen(this.httpRequest())
   .doFinally(concurrentRequestsSemaphore::release);
}

其中 Semaphore 是信号量的本机 Java 实现。如果 maxRequests 为 2,并且我发送了 5 个请求,那么这个机制会按预期工作,其中 2 个会出去,另外 3 个会卡在 fromAction 中等待。但是这种方法伴随着其他意想不到的行为,例如即使在 2 个请求收到响应之后,其他 3 个请求中的 none 也会执行,因为 .doFinally(concurrentRequestsSemaphore::release) 从未执行过。我做了一些测试,它只在 X 请求收到响应后才执行。 X 会变成什么样子是完全无法预测的。因此,可能会有 20 个许可的信号量,发出 20 个请求并返回响应,并且不会执行其他请求,因为信号量从未被任何请求释放。

您没有显示 private Single<...> httpRequest() 的正文。我假设你在那里调用了一些异步方法。异步方法只占用线程来处理响应,当请求本身在服务器之间来回移动时,不使用任何线程。这解释了为什么您看到所有 5 个请求都到达了服务器。 通常,为了限制某种活动的数量,使用 java.util.concurrent.Semaphores,但它们通过阻塞线程来限制活动。从逻辑上讲,由于您的程序是异步的,因此您需要使用异步信号量,但它是一种罕见的野兽。 所以你有以下选择:

  • 完全不要限制异步 http 请求的数量,因为它们不会占用太多资源
  • 启动一个特殊的线程,从普通的同步信号量获取许可,然后启动异步http请求。当请求完全完成时释放信号量
  • 使用固定线程池同步发起 http 请求
  • 使用异步信号量。我知道的唯一实现是在我的库中 DF4JAsyncSemaphore is an extension of the standard Semaphore and so has both synchronous and asynchronous interface, and InpSignal used only in asynchronous programs. An example of InpSignal usage is at AsyncServerSocketChannel.java,用于限制在 Echo Server 实现中打开的客户端连接数。