Spring Kafka 消费者在运行时轮询特定的偏移量
Spring Kafka consumer polling to specific offsets at runtime
在我们的 kafka 消费者中使用 spring kafka。根据我的业务需求,如果处理该批次失败,我需要再次轮询回同一批次的记录。
根据 https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html,部分:-“偏移量和消费者位置” 说 Kafka 为分区中的每条记录维护一个数字偏移量。此偏移量充当该分区内记录的唯一标识符,并且还表示消费者在分区中的位置。例如,位于位置 5 的消费者已经消费了偏移量为 0 到 4 的记录,接下来将接收偏移量为 5 的记录。实际上有两个与消费者用户相关的位置概念:-
消费者的位置给出了下一条将要发出的记录的偏移量。它将比消费者在该分区中看到的最高偏移量大一个。每次消费者在对 poll(Duration) 的调用中收到消息时,它都会自动前进。
提交的位置是已安全存储的最后一个偏移量。如果进程失败并重新启动,这就是消费者将恢复到的偏移量。消费者可以定期自动提交偏移量;或者它可以选择通过调用其中一种提交 API(例如 commitSync 和 commitAsync)来手动控制此提交位置。
对于我的用例,我想控制第一个。有什么办法吗?
SeekToCurrentErrorHandler 将重新定位消费者,以便在侦听器抛出异常时重新传送失败的记录。
执行ConsumerSeekAware在启动时寻找开始。
在我们的 kafka 消费者中使用 spring kafka。根据我的业务需求,如果处理该批次失败,我需要再次轮询回同一批次的记录。 根据 https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html,部分:-“偏移量和消费者位置” 说 Kafka 为分区中的每条记录维护一个数字偏移量。此偏移量充当该分区内记录的唯一标识符,并且还表示消费者在分区中的位置。例如,位于位置 5 的消费者已经消费了偏移量为 0 到 4 的记录,接下来将接收偏移量为 5 的记录。实际上有两个与消费者用户相关的位置概念:-
消费者的位置给出了下一条将要发出的记录的偏移量。它将比消费者在该分区中看到的最高偏移量大一个。每次消费者在对 poll(Duration) 的调用中收到消息时,它都会自动前进。
提交的位置是已安全存储的最后一个偏移量。如果进程失败并重新启动,这就是消费者将恢复到的偏移量。消费者可以定期自动提交偏移量;或者它可以选择通过调用其中一种提交 API(例如 commitSync 和 commitAsync)来手动控制此提交位置。
对于我的用例,我想控制第一个。有什么办法吗?
SeekToCurrentErrorHandler 将重新定位消费者,以便在侦听器抛出异常时重新传送失败的记录。
执行ConsumerSeekAware在启动时寻找开始。