在 Apache Kafka JDBC 中设置 Table/Topic 顺序

Set Table/Topic order in Apache Kafka JDBC

有 2 个主题,source_topic.asource_topic.bsource_topic.asource_topic.b 有依赖关系(例如需要下沉 source_topic.b 第一)。为了说明sink过程,需要先从source_topic.b中sink数据,然后再从source_topic.a中sink数据。有什么方法可以在 source/sink 配置中设置主题/table 的顺序?

以下是使用的配置,有多个table和主题。 timestamp 用于每次轮询时更新 table 的模式。并且 timestamp.initial 将值设置为特定的时间戳。

源配置

name=jdbc-mssql-prod-5
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:sqlserver:
connection.user=
connection.password=
topic.prefix= source_topic.
mode=timestamp
table.whitelist=A,B,C
timestamp.column.name=ModifiedDateTime

connection.backoff.ms=60000
connection.attempts=300

validate.non.null= false
# enter timestamp in milliseconds 
timestamp.initial= 1604977200000 

接收器配置

name=mysql-sink-prod-5
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics= sink_topic_a, sink_topic_b
connection.url=jdbc:mysql:
connection.user=
connection.password=

insert.mode=upsert
delete.enabled=true
pk.mode=record_key


errors.log.enable= true
errors.log.include.messages=true

不,JDBC 接收器连接器不支持这种逻辑。

您正在将批处理思维应用于流世界 :) 考虑一下:Kafka 如何知道它已经“完成”下沉 topic_a?流是无限的,所以你最终不得不说“如果你在给定时间内没有收到更多消息window那么假设你已经完成了从这个接收数据主题并转到下一个".

您可能最好在 Kafka 本身(例如使用 Kafka Streams 或 ksqlDB)中对数据进行必要的连接,然后将结果写回一个新的 Kafka 主题,然后将其下沉到您的数据库中。