apache http 异步客户端中 I/O 个调度程序的数量

number of I/O dispatchers in apache http async client

我正在使用 Apache HTTP 异步客户端发出 http 请求(通过使用来自 Kafka 总线的数据)并处理回调中的响应。

我在我的日志中发现了一些有趣的回调行为,想弄清楚为什么会这样。

这是部分代码(从kafka消费数据并发出http请求):

    while (true) {
        ConsumerRecords<String, MyMessage> records = kafkaConsumer.poll(1000);
        if (records.isEmpty()) {
            logger.info("Polling Empty ....");
            continue;
        }

        int numberOfRecords = records.count();
        final CountDownLatch batchLatch = new CountDownLatch(numberOfRecords);
        for (ConsumerRecord<String, MyMessage> record: records) {
            final HttpPost request = new HttpPost(endpoint);
            // set headers
            // set entity
            request.setEntity(messageToEntity(record.value()));
            httpClient.execute(request, someCallback);
        }

        batchLatch.await();
        kafkaConsumer.commitAsync();
    }

这里是回调的示例代码:

public class SomeCallback {
    // ...
    @Override
    public void completed(final HttpResponse response) {
        // do something
        logger.info("{} - blablabla", status, ...);
        latch.countDown();
    }

    @Override
    public void failed(final Exception ex) {
        // do something
        logger.error("{} - blablabla", FAILED, ..., ex);
        latch.countDown();
    }

    @Override
    public void cancelled() {
        // do something
        logger.error("{} - blablabla!", CANCELLED, ...);
        latch.countDown();
    }
}

所以情况是,在某个时间点,我启动了程序,它开始使用来自 kafka 的数据。每次轮询的最大记录数为 500。

并且因为Kafka上已经有很多未消费的数据,所以程序面临着很高的吞吐量。

现在,日志如下所示:

[INFO ] 2018-04-29 04:11:29.234 [main] MyClass - Polling got records: 500
[INFO ] 2018-04-29 04:11:30.362 [I/O dispatcher 1] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 2] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 1] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 1] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 2] SUCCESS blablabla
...
...
[INFO ] 2018-04-29 06:28:35.003 [main] MyClass - Polling got records: 500
[INFO ] 2018-04-29 06:28:35.363 [I/O dispatcher 386] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 385] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 386] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 385] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 386] SUCCESS blablabla
...
[INFO ] 2018-04-29 06:31:35.003 [main] MyClass - Polling got records: 500
[INFO ] 2018-04-29 06:32:12.418 [I/O dispatcher 405] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 406] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 405] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 406] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 405] SUCCESS blablabla
...
[ERROR] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 405] FAILED blablabla! org.apache.http.ConnectionClosedException: Connection closed unexpectedly
[ERROR] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 406] FAILED blablabla! org.apache.http.ConnectionClosedException: Connection closed unexpectedly
// there the program stopped

以下是我无法理解的内容:

  1. 为什么每 500 个请求(响应处理)只有两个 I/O 调度程序?是因为默认设置max 2吗?
  2. 为什么每 500 个请求,I/O 调度程序的数量不断增加?通常,我之前的经验是, I/O dispatcher 的数量会增加,但也会减少到 1 和 2。我的假设是:它会重用之前的一些 I/O dispatcher ,而不是每次都创建新的。
  3. 为什么在最后出现 ConnectionClosedException?是因为太多 I/O 调度员停止了程序吗?

更新

感谢 Oleg 的评论,我发现 I/O 调度程序的数量不断增加是因为每次从 Kafka 获取数据时,都会创建一个新的 http 客户端。然后,就会有很多闲置客户端占用IOs和资源。这也是程序停止的原因。

我还有一些疑问:

  1. 日志中 I/O 调度程序的 编号 是什么意思?是不同线程的数量吗?
  2. 如何控制一个 http 客户端的最大 I/O 调度程序数?是 I/O 调度员发出的吗?
  3. 此 I/O 调度程序数与连接数之间有何区别?
  4. 如何根据我所在的机器 运行 程序和数据 throughput/size 来估计我需要的 I/O 调度器数量和连接数量?
  1. 表示线程名

  2. I/O 反应器的属性和行为可以用 IOReactorConfig

  3. 控制
  4. 少量 I/O 调度程序管理大量连接。

  5. 很可能每个 CPU 核心一个 I/O 分派线程是大多数应用程序的合理默认值,不应更改它。