重新启动 kafka connect sink 和 source 连接器以从头开始读取
Restart kafka connect sink and source connectors to read from beginning
我对此进行了很多搜索,但似乎没有很好的指南。
根据我的搜索,有几点需要考虑:
- 正在重置 Sink Connector 内部主题(状态、配置和偏移量)。
- 源连接器偏移实现是特定于实现的。
问:是否需要重新设置这些主题?
- 正在删除消费者组。
- 用不同的名称重新启动连接器(这也是一个选项),但这似乎不是正确的做法。
- 正在将消费者组重置为
--reset-offsets
到 --to-earliest
- 使用 REST API(是否提供重置和从头读取的功能)
重新启动接收器和源连接器以从头开始读取的最佳方法是什么?
源连接器:
- 独立模式:删除偏移文件 (
/tmp/connect.offsets
) 或更改连接器名称。
- 分布式模式:更改连接器的名称。
Sink Connector(两种模式)以下方法之一:
- 更改名称。
- 消费者组的重置偏移量。组的名称与连接器名称相同。
要重置偏移量,您必须先删除连接器,重置偏移量 (./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --group connectorName --reset-offsets --to-earliest --execute --topic topicName
),再添加一次相同的配置
您可以查看以下问题:
源连接器分布式模式 - 有另一个选项,它正在为偏移主题生成新消息。
例如我使用 jdbc 源连接器:
在查看偏移主题时,我看到以下内容:
./kafka-console-consumer.sh --zookeeper localhost:2181/kafka11-staging --topic kc-staging--offsets --from-beginning --property print.key=true
["referrer-family-jdbc-source",{"query":"query"}] {"incrementing":100}
现在为了重置它,我只用 incrementing:0
生成另一条消息
例如:how to produce from shell with key from here
./kafka-console-producer.sh \
--broker-list `hostname`:9092 \
--topic kc-staging--offsets \
--property "parse.key=true" \
--property "key.separator=|"
["referrer-family-jdbc-source",{"query":"query"}]|{"incrementing":0}
请注意,您需要执行以下操作:
- 删除连接器。
- 按照我上面的描述生成一条带有相关偏移量的消息。
- 再次创建连接器。
有点晚了,但找到了另一种方法。只需将独立模式下的 offset.storage.file.name 设置为 dev/null:
#worker.properties
offset.storage.file.filename=/dev/null
#cmdline
connect-standalone /data/config/worker.properties /data/config/connector.properties
我对此进行了很多搜索,但似乎没有很好的指南。
根据我的搜索,有几点需要考虑:
- 正在重置 Sink Connector 内部主题(状态、配置和偏移量)。
- 源连接器偏移实现是特定于实现的。
问:是否需要重新设置这些主题?
- 正在删除消费者组。
- 用不同的名称重新启动连接器(这也是一个选项),但这似乎不是正确的做法。
- 正在将消费者组重置为
--reset-offsets
到--to-earliest
- 使用 REST API(是否提供重置和从头读取的功能)
重新启动接收器和源连接器以从头开始读取的最佳方法是什么?
源连接器:
- 独立模式:删除偏移文件 (
/tmp/connect.offsets
) 或更改连接器名称。 - 分布式模式:更改连接器的名称。
Sink Connector(两种模式)以下方法之一:
- 更改名称。
- 消费者组的重置偏移量。组的名称与连接器名称相同。
要重置偏移量,您必须先删除连接器,重置偏移量 (./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --group connectorName --reset-offsets --to-earliest --execute --topic topicName
),再添加一次相同的配置
您可以查看以下问题:
源连接器分布式模式 - 有另一个选项,它正在为偏移主题生成新消息。 例如我使用 jdbc 源连接器: 在查看偏移主题时,我看到以下内容:
./kafka-console-consumer.sh --zookeeper localhost:2181/kafka11-staging --topic kc-staging--offsets --from-beginning --property print.key=true
["referrer-family-jdbc-source",{"query":"query"}] {"incrementing":100}
现在为了重置它,我只用 incrementing:0
生成另一条消息例如:how to produce from shell with key from here
./kafka-console-producer.sh \
--broker-list `hostname`:9092 \
--topic kc-staging--offsets \
--property "parse.key=true" \
--property "key.separator=|"
["referrer-family-jdbc-source",{"query":"query"}]|{"incrementing":0}
请注意,您需要执行以下操作:
- 删除连接器。
- 按照我上面的描述生成一条带有相关偏移量的消息。
- 再次创建连接器。
有点晚了,但找到了另一种方法。只需将独立模式下的 offset.storage.file.name 设置为 dev/null:
#worker.properties
offset.storage.file.filename=/dev/null
#cmdline
connect-standalone /data/config/worker.properties /data/config/connector.properties