如何使用 Kafka JDBC 连接器跟踪具有特定列值的行(按 ID)?

How to track rows (by id) with a specific column value using Kafka JDBC Connector?

我有一个 table 包含大量记录。有一列定义记录的类型。我想收集该列中具有特定值的记录。种类:

Select * FROM myVeryOwnTable WHERE type = "VERY_IMPORTANT_TYPE" 

我注意到当我选择增量(+时间戳)模式时,我不能在自定义查询中使用 WHERE 子句,否则如果我自己过滤,我需要小心。 我想实现的背景是我使用 Logstash 将某种类型的数据从 MySQL 传输到 ES。通过使用可以包含 where 子句的查询,这很容易实现。但是,使用 Kafka,在数据库中插入新行后,我可以更快(几乎立即)传输我的数据。

感谢您的任何提示或建议。


多亏了@wardziniak,我才能够进行设置。

query=select * from (select * from myVeryOwnTable p where type = 'VERY_IMPORTANT_TYPE') p
topic.prefix=test-mysql-jdbc-
incrementing.column.name=id

但是,我期待一个主题 test-mysql-jdbc-myVeryOwnTable,所以我已将我的消费者注册到该主题。但是,使用上面显示的查询 table name 被跳过,所以我的主题被命名为完全按照上面定义的前缀。所以我刚刚更新了我的属性 topic.prefix=test-mysql-jdbc-myVeryOwnTable,它似乎工作正常。

您可以在 Jdbc 源连接器 query 属性.

中使用子查询

示例 JDBC 源连接器配置:

{
    ...
    "query": "select * from (select * from myVeryOwnTable p where type = 'VERY_IMPORTANT_TYPE') p",
    "incrementing.column.name": "id",
    ...
}