使用自定义(ConsumerAware)错误处理程序时如何寻求和提交?
How to seek & commit when using custom (ConsumerAware) error handler?
情况
我正在尝试为 Spring-Kafka(2.3.3.RELEASE) batch 消息侦听器实现自定义错误处理程序。我的配置很简单:
val factory = ConcurrentKafkaListenerContainerFactory<Int, String>()
factory.consumerFactory = kafkaConsumerFactory
factory.isBatchListener = true
factory.setBatchErrorHandler(MyErrorHandler())
没有任何额外的消费者配置。请注意 "Starting with version 2.3, the framework sets enable.auto.commit to false".
我正在使用 "consumer aware" @KafkaListener
并且我的自定义错误处理程序也实现了 ConsumerAwareBatchErrorHandler
.
在我的错误处理程序中,我使用 Consumer.seek(topicPartition, offset)
方法寻找下一个或寻找当前偏移量。
问题是每当调用错误处理程序并且我手动寻找某个偏移量时,偏移量似乎并没有提交给 Kafka。事实证明,在应用程序重新启动后,它会轮询相同的(先前失败的)记录(即使代码寻找下一个)。
更让我困惑的是,虽然应用程序是 运行,但它似乎确实使用了 seek()
的偏移量并轮询 new 记录。
我试过使用不同的 AckMode
,例如 MANUAL
或 MANUAL_IMMIDIATE
,但这并没有改变观察到的行为。文档还针对这些模式说:
MANUAL, and MANUAL_IMMEDIATE require the listener to be an AcknowledgingMessageListener or a BatchAcknowledgingMessageListener
显然我没有使用。
如果已经找到 "fix",它在 .seek()
之后调用 consumer.commitSync()
,这似乎会立即提交偏移量。这也得到了日志记录的支持:
DEBUG 13784 --- [ntainer#2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=kafka-retry-test] Committed offset 28 for partition test-retry-batch-custom-e-0
问题
为什么只调用 consumer.seek(...)
不向 kafka 提交偏移量,我是否缺少一些关键配置?调用 .commitSync()
似乎是一个 hack,并且在任何地方都需要的文档中没有描述。
查找不会提交偏移量,它只是将当前消费者定位在该点。您需要致电 syncCommit
或 asyncCommit
.
但是,从版本 2.3.2(当前版本为 2.3.4)开始,您可以 return true from isAckAfterHandle()
:
/**
* Return true if the offset should be committed for a handled error (no exception
* thrown).
* @return true to commit.
* @since 2.3.2
*/
default boolean isAckAfterHandle() {
// TODO: Default true in the next release.
return false;
}
容器将为您完成提交。
Starting with version 2.3.2, these interfaces have a default method isAckAfterHandle()
which is called by the container to determine whether the offset(s) should be committed if the error handler returns without throwing an exception. This returns false by default, for backwards compatibility. In most cases, however, we expect that the offset should be committed. For example, the SeekToCurrentErrorHandler returns true if a record is recovered (after any retries, if so configured). In a future release, we expect to change this default to true.
但是,如果您使用手动确认,则您的错误处理程序必须手动提交。
情况
我正在尝试为 Spring-Kafka(2.3.3.RELEASE) batch 消息侦听器实现自定义错误处理程序。我的配置很简单:
val factory = ConcurrentKafkaListenerContainerFactory<Int, String>()
factory.consumerFactory = kafkaConsumerFactory
factory.isBatchListener = true
factory.setBatchErrorHandler(MyErrorHandler())
没有任何额外的消费者配置。请注意 "Starting with version 2.3, the framework sets enable.auto.commit to false".
我正在使用 "consumer aware" @KafkaListener
并且我的自定义错误处理程序也实现了 ConsumerAwareBatchErrorHandler
.
在我的错误处理程序中,我使用 Consumer.seek(topicPartition, offset)
方法寻找下一个或寻找当前偏移量。
问题是每当调用错误处理程序并且我手动寻找某个偏移量时,偏移量似乎并没有提交给 Kafka。事实证明,在应用程序重新启动后,它会轮询相同的(先前失败的)记录(即使代码寻找下一个)。
更让我困惑的是,虽然应用程序是 运行,但它似乎确实使用了 seek()
的偏移量并轮询 new 记录。
我试过使用不同的 AckMode
,例如 MANUAL
或 MANUAL_IMMIDIATE
,但这并没有改变观察到的行为。文档还针对这些模式说:
MANUAL, and MANUAL_IMMEDIATE require the listener to be an AcknowledgingMessageListener or a BatchAcknowledgingMessageListener
显然我没有使用。
如果已经找到 "fix",它在 .seek()
之后调用 consumer.commitSync()
,这似乎会立即提交偏移量。这也得到了日志记录的支持:
DEBUG 13784 --- [ntainer#2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=kafka-retry-test] Committed offset 28 for partition test-retry-batch-custom-e-0
问题
为什么只调用 consumer.seek(...)
不向 kafka 提交偏移量,我是否缺少一些关键配置?调用 .commitSync()
似乎是一个 hack,并且在任何地方都需要的文档中没有描述。
查找不会提交偏移量,它只是将当前消费者定位在该点。您需要致电 syncCommit
或 asyncCommit
.
但是,从版本 2.3.2(当前版本为 2.3.4)开始,您可以 return true from isAckAfterHandle()
:
/**
* Return true if the offset should be committed for a handled error (no exception
* thrown).
* @return true to commit.
* @since 2.3.2
*/
default boolean isAckAfterHandle() {
// TODO: Default true in the next release.
return false;
}
容器将为您完成提交。
Starting with version 2.3.2, these interfaces have a default method
isAckAfterHandle()
which is called by the container to determine whether the offset(s) should be committed if the error handler returns without throwing an exception. This returns false by default, for backwards compatibility. In most cases, however, we expect that the offset should be committed. For example, the SeekToCurrentErrorHandler returns true if a record is recovered (after any retries, if so configured). In a future release, we expect to change this default to true.
但是,如果您使用手动确认,则您的错误处理程序必须手动提交。