max.in.flight.requests.per.connection 和 Spring Kafka Producer 使用 KafkaTemplate 同步事件发布
max.in.flight.requests.per.connection and Spring Kafka Producer Synchronous Event Publishing with KafkaTemplate
我对 Kafka Producers 的 max.in.flight.requests.per.connection
和使用 Spring-Kafka 同步发布事件之间的关系有点困惑,希望有人能够澄清二.
我正在寻找使用 Spring Kafka 的 KafkaTemplate
与 Spring Kafka 设置同步事件发布。 The Spring Kafka documentation 提供了使用 ListenableFuture
的 get(SOME_TIME, TimeUnit)
启用事件同步发布的示例(复制如下以供参考)。
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
我在看Kafka's Producer Configuration Documentation,看到Kafka有一个配置max.in.flight.requests.per.connection
,它负责Kafka中的以下设置。
The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).
当异步处理事件发布时,max.in.flight.requests.per.connection
给set to value 1赋什么值?将 max.in.flight.requests.per.connection
的值设置为 1 是否会强制同步发布 Kafka Producer 的事件?如果我想为 Kafka Producer 设置事件的同步发布并采用 Spring-Kafka 推荐的方法,我应该关注 max.in.flight.requests.per.connection
还是忽略它是否安全?
我不相信他们有任何关系。发送仍然是异步的;将它设置为一个意味着第二个将阻塞直到第一个完成。
future1 = template.send(...);
future2 = template.send(...); // this will block
future1.get(); // and this will return almost immediately
future2.get();
你还需要得到未来的结果,来测试success/failure。
我对 Kafka Producers 的 max.in.flight.requests.per.connection
和使用 Spring-Kafka 同步发布事件之间的关系有点困惑,希望有人能够澄清二.
我正在寻找使用 Spring Kafka 的 KafkaTemplate
与 Spring Kafka 设置同步事件发布。 The Spring Kafka documentation 提供了使用 ListenableFuture
的 get(SOME_TIME, TimeUnit)
启用事件同步发布的示例(复制如下以供参考)。
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
我在看Kafka's Producer Configuration Documentation,看到Kafka有一个配置max.in.flight.requests.per.connection
,它负责Kafka中的以下设置。
The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).
当异步处理事件发布时,max.in.flight.requests.per.connection
给set to value 1赋什么值?将 max.in.flight.requests.per.connection
的值设置为 1 是否会强制同步发布 Kafka Producer 的事件?如果我想为 Kafka Producer 设置事件的同步发布并采用 Spring-Kafka 推荐的方法,我应该关注 max.in.flight.requests.per.connection
还是忽略它是否安全?
我不相信他们有任何关系。发送仍然是异步的;将它设置为一个意味着第二个将阻塞直到第一个完成。
future1 = template.send(...);
future2 = template.send(...); // this will block
future1.get(); // and this will return almost immediately
future2.get();
你还需要得到未来的结果,来测试success/failure。