如何设置 kafka connect auto.offset.reset with rest api
how to set kafka connect auto.offset.reset with rest api
我已经创建了一个接收器 kafka 连接,可以将数据转换到其他存储;当使用 kafka connect rest api
创建新连接器时,我想将 auto.offset.reset
设置为 latest
;我在配置中设置了 consumer.auto.offset.reset: latest
;
json
{
"name": "test_v14",
"config": {
"name": "test_v14",
"consumer.auto.offset.reset": "latest",
"connector.class": "...",
...
}
}
但是当task开始的时候,kafka consumer还是轮询最早的记录;将 auto.offset.reset
设置为最新的任何其他方法也是如此;
Kafka 2.3 之前
consumer.auto.offset.reset
需要在connect-distributed.properties
文件(Worker)中设置。
它不能应用于任何特定的连接器,除非该连接器 class 正在显式创建和加载它自己的消费者对象,该对象在 属性 中读取。
从 Apache Kafka 2.3 开始,现在可以将其设置为连接器配置的一部分。
在 worker 集合上:
connector.client.config.override.policy=All
然后在连接器中你可以指定
"consumer.override.auto.offset.reset": "latest"
有关详细信息,请参阅此内容:https://rmoff.net/2019/08/09/starting-a-kafka-connect-sink-connector-at-the-end-of-a-topic/
我已经创建了一个接收器 kafka 连接,可以将数据转换到其他存储;当使用 kafka connect rest api
创建新连接器时,我想将 auto.offset.reset
设置为 latest
;我在配置中设置了 consumer.auto.offset.reset: latest
;
json
{
"name": "test_v14",
"config": {
"name": "test_v14",
"consumer.auto.offset.reset": "latest",
"connector.class": "...",
...
}
}
但是当task开始的时候,kafka consumer还是轮询最早的记录;将 auto.offset.reset
设置为最新的任何其他方法也是如此;
Kafka 2.3 之前
consumer.auto.offset.reset
需要在connect-distributed.properties
文件(Worker)中设置。
它不能应用于任何特定的连接器,除非该连接器 class 正在显式创建和加载它自己的消费者对象,该对象在 属性 中读取。
从 Apache Kafka 2.3 开始,现在可以将其设置为连接器配置的一部分。
在 worker 集合上:
connector.client.config.override.policy=All
然后在连接器中你可以指定
"consumer.override.auto.offset.reset": "latest"
有关详细信息,请参阅此内容:https://rmoff.net/2019/08/09/starting-a-kafka-connect-sink-connector-at-the-end-of-a-topic/