重置 JDBC Kafka Connector 以从时间开始拉取行?

Reset the JDBC Kafka Connector to start pulling rows from the beginning of time?

Kafka 连接器可以使用主键和时间戳来确定需要处理哪些行。

我正在寻找一种方法来重置连接器,以便它从一开始就处理。

因为要求是在分布式模式下运行,所以最简单的做法是将连接器名称更新为新值。这将提示在 connect-offsets 主题中创建一个新条目,因为它看起来像一个全新的连接器。然后连接器应该再次开始读取,就好像什么都没有写入 Kafka 一样。您还可以手动将逻辑删除消息发送到与该特定连接器关联的连接偏移主题中的键,但重命名比处理它容易得多。此方法适用于所有源连接器,而不仅仅是此处描述的 JDBC。

我厌倦了在开发过程中每次都重命名连接器,所以开始使用逻辑删除方法。此方法可用于任何源连接器。

首先检查连接器key/value的格式:

kafka-console-consumer --bootstrap-server localhost:9092 --topic kafka-connect-offsets --from-beginning --property print.key=true
["demo",{"query":"query"}] {"timestamp_nanos":542000000,"timestamp":1535768081542}
["demo",{"query":"query"}] {"timestamp_nanos":171831000,"timestamp":1540435281171}
["demo",{"query":"query"}] {"timestamp_nanos":267775000,"timestamp":1579522539267}

通过发送不带任何值的密钥创建逻辑删除消息:

echo '["demo",{"query":"query"}]#' | kafka-console-producer --bootstrap-server localhost:9092 --topic kafka-connect-offsets --property "parse.key=true" --property "key.separator=#"

现在重新启动或重新创建连接器,它将再次开始生成消息。

除非您真的知道自己在做什么,否则在生产中要非常小心。这里有更多信息:https://rmoff.net/2019/08/15/reset-kafka-connect-source-connector-offsets/