如何设置 kafka connect auto.offset.reset with rest api

how to set kafka connect auto.offset.reset with rest api

我已经创建了一个接收器 kafka 连接,可以将数据转换到其他存储;当使用 kafka connect rest api 创建新连接器时,我想将 auto.offset.reset 设置为 latest;我在配置中设置了 consumer.auto.offset.reset: latest

json { "name": "test_v14", "config": { "name": "test_v14", "consumer.auto.offset.reset": "latest", "connector.class": "...", ... } }

但是当task开始的时候,kafka consumer还是轮询最早的记录;将 auto.offset.reset 设置为最新的任何其他方法也是如此;

Kafka 2.3 之前

consumer.auto.offset.reset需要在connect-distributed.properties文件(Worker)中设置。

它不能应用于任何特定的连接器,除非该连接器 class 正在显式创建和加载它自己的消费者对象,该对象在 属性 中读取。

从 Apache Kafka 2.3 开始,现在可以将其设置为连接器配置的一部分。

worker 集合上:

connector.client.config.override.policy=All

然后在连接器中你可以指定

"consumer.override.auto.offset.reset": "latest"

有关详细信息,请参阅此内容:https://rmoff.net/2019/08/09/starting-a-kafka-connect-sink-connector-at-the-end-of-a-topic/