使用并发时如何使用 ConsumerAwareErrorHandler 提交偏移量?

How to commit offset with ConsumerAwareErrorHandler when using concurrency?

在发生错误的情况下,ConsumerAwareErrorHandler 接口提供对 ConsumerRecord 和 Consumer in handle 方法的引用。现在在处理错误后,我们可以提交偏移量或忽略。但我想知道,当我们使用多个线程 (ConcurrentKafkaListenerContainerFactory.setConcurrency) 时它会如何表现。如果最新的记录由于不同的原因在前一个记录之前被错误输出,我们为最新的记录提交偏移量,但是前一个记录失败并出现致命错误,我们实际上应该没有提交任何偏移量。

我将 AckMode.COUNT 与 !ackOnError

结合使用

使用并发时,每个分区都分配给一个特定的 consumer/thread,因此另一个线程不可能处理来自同一分区的记录。

分区分布在线程中。