JDBC 接收器连接器 insert/upsert 基于最大时间戳?
JDBC sink connector insert/upsert based on max timestamp?
我对 Kafka 连接还很陌生
我正在将来自多个来源的记录插入一个 table。
在某些情况下,某些记录可能先于其他记录到达。
由于我无法控制哪个源将首先提取哪条记录,因此我想添加对记录的时间戳键的检查。
我的模式中有一个名为 "LastModified_timestamp" 的键,我在其中存储记录的最新状态的时间戳。
我想向我的 JDBC 接收器连接器添加一个检查,我可以在其中根据比较 LastModified_timestamp
的值更新记录
我想忽略时间戳较旧的记录,只想 upsert/insert 最新的记录。我找不到任何配置来实现这个
有什么方法可以实现吗?
在这种情况下编写自定义查询会有帮助吗?
JDBC 接收器连接器不支持这种功能。您有两个选择要考虑:
单消息转换 (SMT) - 这些在记录通过 Kafka Connect 时将逻辑应用于记录。 SMT 非常适合删除列、更改数据类型等。但是 不适合更复杂的处理和逻辑,包括需要像您这里那样跨越多个记录的逻辑
首先处理源 Kafka 主题中的数据,以应用必要的逻辑。您可以使用 Kafka Streams、KSQL 和其他几个流处理框架(例如 Spark、Flink 等)来做到这一点。如果记录比已处理的记录更早,您需要某种有状态的逻辑来计算。
你能详细描述一下你的上游数据源吗?可能有更好的方法来编排通过的数据以强制执行排序。
最终的想法是将 所有 记录放入目标数据库,然后在数据库查询中使用逻辑将其消耗到 select 最近(基于LastModified_timestamp
) 给定键的记录。
免责声明:我在开源 KSQL 项目背后的公司 Confluent 工作。
我对 Kafka 连接还很陌生
我正在将来自多个来源的记录插入一个 table。 在某些情况下,某些记录可能先于其他记录到达。 由于我无法控制哪个源将首先提取哪条记录,因此我想添加对记录的时间戳键的检查。
我的模式中有一个名为 "LastModified_timestamp" 的键,我在其中存储记录的最新状态的时间戳。
我想向我的 JDBC 接收器连接器添加一个检查,我可以在其中根据比较 LastModified_timestamp
的值更新记录我想忽略时间戳较旧的记录,只想 upsert/insert 最新的记录。我找不到任何配置来实现这个
有什么方法可以实现吗? 在这种情况下编写自定义查询会有帮助吗?
JDBC 接收器连接器不支持这种功能。您有两个选择要考虑:
单消息转换 (SMT) - 这些在记录通过 Kafka Connect 时将逻辑应用于记录。 SMT 非常适合删除列、更改数据类型等。但是 不适合更复杂的处理和逻辑,包括需要像您这里那样跨越多个记录的逻辑
首先处理源 Kafka 主题中的数据,以应用必要的逻辑。您可以使用 Kafka Streams、KSQL 和其他几个流处理框架(例如 Spark、Flink 等)来做到这一点。如果记录比已处理的记录更早,您需要某种有状态的逻辑来计算。
你能详细描述一下你的上游数据源吗?可能有更好的方法来编排通过的数据以强制执行排序。
最终的想法是将 所有 记录放入目标数据库,然后在数据库查询中使用逻辑将其消耗到 select 最近(基于LastModified_timestamp
) 给定键的记录。
免责声明:我在开源 KSQL 项目背后的公司 Confluent 工作。