KafkaConsumer.commitSync() 实际提交了什么?
What does KafkaConsumer.commitSync() actually commit?
KafkaConsumer.commitSync 是否只是像 JavaDoc 声称的那样提交 "offsets returned on the last poll()"(这可能会遗漏一些未包含在最后 poll
结果中的分区),或者它实际上正在提交最新的位置所有 个订阅分区?询问是因为代码建议后者,考虑到 allConsumed
:
https://github.com/apache/kafka/blob/2.4.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1387
@Override
public void commitSync(Duration timeout) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
if (!coordinator.commitOffsetsSync(subscriptions.allConsumed(), time.timer(timeout))) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
"committing the current consumed offsets");
}
} finally {
release();
}
}
它只提交实际轮询和处理的偏移量。如果上次轮询中未包含某些偏移量,则不会提交这些偏移量。
它将不提交所有订阅分区的最新位置。这会干扰 Kafka 的 Consumer Offset 管理概念,以便能够在它停止的地方重新启动应用程序。
根据我的理解,allConsumed
等同于 all offsets included in the last poll
,commitSync
的注释也记录了
Commit offsets returned on the last poll() for all the subscribed list of topics and partitions.
KafkaConsumer.commitSync 是否只是像 JavaDoc 声称的那样提交 "offsets returned on the last poll()"(这可能会遗漏一些未包含在最后 poll
结果中的分区),或者它实际上正在提交最新的位置所有 个订阅分区?询问是因为代码建议后者,考虑到 allConsumed
:
https://github.com/apache/kafka/blob/2.4.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1387
@Override
public void commitSync(Duration timeout) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
if (!coordinator.commitOffsetsSync(subscriptions.allConsumed(), time.timer(timeout))) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
"committing the current consumed offsets");
}
} finally {
release();
}
}
它只提交实际轮询和处理的偏移量。如果上次轮询中未包含某些偏移量,则不会提交这些偏移量。
它将不提交所有订阅分区的最新位置。这会干扰 Kafka 的 Consumer Offset 管理概念,以便能够在它停止的地方重新启动应用程序。
根据我的理解,allConsumed
等同于 all offsets included in the last poll
,commitSync
的注释也记录了
Commit offsets returned on the last poll() for all the subscribed list of topics and partitions.