Kafka Connect - 不适用于更新操作
Kafka Connect - Not working for UPDATE operation
我是 Kafka-Connect 源和接收器的新手。我创建了应用程序以将 Table 数据从一个模式 (Schema1) 传输到另一个模式 (Schema2),这里我使用 Oracle 作为数据库。我成功地将data/row从Table“Schema1.Header”转移到Table“Schema2.Header”进行INSERT操作,但是无法使用下面提到的配置进行更新操作。
源配置:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@localhost:1524:XE",
"connection.user": "USER",
"connection.password": "user1234",
"dialect.name": "OracleDatabaseDialect",
"topic.prefix": "Schema1.Header",
"incrementing.column.name": "SC_NO",
"mode": "incrementing",
"query": "SELECT * FROM (SELECT HEADER_V1.* FROM Schema1.Header HEADER_V1 INNER JOIN Schema1.LINE_V1 LINE_V1 ON HEADER_V1.SC_NO = LINE_V1.SC_NO AND LINE_V1.CLNAME_CODE ='XXXXXX' AND HEADER_V1.ITEM_TYPE = 'XXX')",
"transforms": "ReplaceField",
"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.blacklist": "col_3,col_10"
}
接收器配置:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@localhost:1524:XE",
"connection.user": "USER2",
"connection.password": "user21234",
"dialect.name": "OracleDatabaseDialect",
"topics": "Schema1.Header",
"table.name.format": "Schema2.Header",
"tasks.max": "1"
}
请帮我解决这个问题。
注意:我只需要在 Schema Schema1.Tables 中执行所有 CRUD 操作,使用 Kafka connect 将这些数据传输到另一个 Schema Schema2.Tables。新插入的 data/row 已传输但更新的 data/row 未通过 Kafka-Connect 传输。我必须做些什么才能做到这一点?
根据这个 blog 你需要将 mode
设置为 timestamp
(或者更好的 timestamp+incrementing
如果你想 new 和 已更新 行)在您的源配置中。
此外,您还需要指定 timestamp.column.name
,它应指向每次更新行时更新的时间戳列。
我是 Kafka-Connect 源和接收器的新手。我创建了应用程序以将 Table 数据从一个模式 (Schema1) 传输到另一个模式 (Schema2),这里我使用 Oracle 作为数据库。我成功地将data/row从Table“Schema1.Header”转移到Table“Schema2.Header”进行INSERT操作,但是无法使用下面提到的配置进行更新操作。
源配置:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@localhost:1524:XE",
"connection.user": "USER",
"connection.password": "user1234",
"dialect.name": "OracleDatabaseDialect",
"topic.prefix": "Schema1.Header",
"incrementing.column.name": "SC_NO",
"mode": "incrementing",
"query": "SELECT * FROM (SELECT HEADER_V1.* FROM Schema1.Header HEADER_V1 INNER JOIN Schema1.LINE_V1 LINE_V1 ON HEADER_V1.SC_NO = LINE_V1.SC_NO AND LINE_V1.CLNAME_CODE ='XXXXXX' AND HEADER_V1.ITEM_TYPE = 'XXX')",
"transforms": "ReplaceField",
"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.blacklist": "col_3,col_10"
}
接收器配置:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@localhost:1524:XE",
"connection.user": "USER2",
"connection.password": "user21234",
"dialect.name": "OracleDatabaseDialect",
"topics": "Schema1.Header",
"table.name.format": "Schema2.Header",
"tasks.max": "1"
}
请帮我解决这个问题。
注意:我只需要在 Schema Schema1.Tables 中执行所有 CRUD 操作,使用 Kafka connect 将这些数据传输到另一个 Schema Schema2.Tables。新插入的 data/row 已传输但更新的 data/row 未通过 Kafka-Connect 传输。我必须做些什么才能做到这一点?
根据这个 blog 你需要将 mode
设置为 timestamp
(或者更好的 timestamp+incrementing
如果你想 new 和 已更新 行)在您的源配置中。
此外,您还需要指定 timestamp.column.name
,它应指向每次更新行时更新的时间戳列。