Kafka Connect 手动 offset/commit 管理
Kafka Connect manual offset/commit management
我正在尝试处理我正在处理的自定义 Kafka 连接器中的偏移提交。
我已经尝试在连接器配置中配置这个 - "consumer.enable.auto.commit": "false".
此外,我重写了 class 中继承自 SinkTask class 的 preCommit 方法,因此它 returns 是一个空映射,因为根据文档,这是必需的为了手动管理偏移量(参考https://kafka.apache.org/11/javadoc/org/apache/kafka/connect/sink/SinkTask.html#preCommit-java.util.Map-)。
我也试过用空映射调用 flush 方法。
然而,通过上述所有尝试以及这些尝试的排列组合,消息仍然被提交并且没有被重新处理(明确地说,我的目标是如果消息没有被提交,它将在下一个轮询间隔)。
还有什么办法可以让消费的消息在下一次轮询时不再提交消费?
这里的问题是我对 Kafka 的工作原理缺乏了解。
基本上在内存中跟踪偏移进度,flush/preCommit 只是为了获取内存中的内容并写入磁盘。
这意味着刷新较旧的偏移量 仅 下次消费者将重新启动时(因为它从磁盘读取它需要启动的偏移量)。
SinkTaskContext 中还有另一个名为“offset”的方法,它获取主题分区和偏移量的映射并将其设置在内存中供消费者使用。
KafkaConnect运行时,在每次轮询之前,在内存中获取跟踪的偏移量,如果需要,根据前面提到的偏移量方法存储的映射调用underline consumer seek方法。
反过来,这实际上会让消费者读取在“offset”方法中设置的偏移量。
我正在尝试处理我正在处理的自定义 Kafka 连接器中的偏移提交。
我已经尝试在连接器配置中配置这个 - "consumer.enable.auto.commit": "false".
此外,我重写了 class 中继承自 SinkTask class 的 preCommit 方法,因此它 returns 是一个空映射,因为根据文档,这是必需的为了手动管理偏移量(参考https://kafka.apache.org/11/javadoc/org/apache/kafka/connect/sink/SinkTask.html#preCommit-java.util.Map-)。
我也试过用空映射调用 flush 方法。
然而,通过上述所有尝试以及这些尝试的排列组合,消息仍然被提交并且没有被重新处理(明确地说,我的目标是如果消息没有被提交,它将在下一个轮询间隔)。
还有什么办法可以让消费的消息在下一次轮询时不再提交消费?
这里的问题是我对 Kafka 的工作原理缺乏了解。 基本上在内存中跟踪偏移进度,flush/preCommit 只是为了获取内存中的内容并写入磁盘。
这意味着刷新较旧的偏移量 仅 下次消费者将重新启动时(因为它从磁盘读取它需要启动的偏移量)。
SinkTaskContext 中还有另一个名为“offset”的方法,它获取主题分区和偏移量的映射并将其设置在内存中供消费者使用。
KafkaConnect运行时,在每次轮询之前,在内存中获取跟踪的偏移量,如果需要,根据前面提到的偏移量方法存储的映射调用underline consumer seek方法。
反过来,这实际上会让消费者读取在“offset”方法中设置的偏移量。