max.in.flight.requests.per.connection 和 Spring Kafka Producer 使用 KafkaTemplate 同步事件发布

max.in.flight.requests.per.connection and Spring Kafka Producer Synchronous Event Publishing with KafkaTemplate

我对 Kafka Producers 的 max.in.flight.requests.per.connection 和使用 Spring-Kafka 同步发布事件之间的关系有点困惑,希望有人能够澄清二.

我正在寻找使用 Spring Kafka 的 KafkaTemplate 与 Spring Kafka 设置同步事件发布。 The Spring Kafka documentation 提供了使用 ListenableFutureget(SOME_TIME, TimeUnit) 启用事件同步发布的示例(复制如下以供参考)。

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

我在看Kafka's Producer Configuration Documentation,看到Kafka有一个配置max.in.flight.requests.per.connection,它负责Kafka中的以下设置。

The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).

当异步处理事件发布时,max.in.flight.requests.per.connection给set to value 1赋什么值?将 max.in.flight.requests.per.connection 的值设置为 1 是否会强制同步发布 Kafka Producer 的事件?如果我想为 Kafka Producer 设置事件的同步发布并采用 Spring-Kafka 推荐的方法,我应该关注 max.in.flight.requests.per.connection 还是忽略它是否安全?

我不相信他们有任何关系。发送仍然是异步的;将它设置为一个意味着第二个将阻塞直到第一个完成。

 future1 = template.send(...);
 future2 = template.send(...); // this will block
 future1.get(); // and this will return almost immediately
 future2.get();

你还需要得到未来的结果,来测试success/failure。