Kafka 连接 JDBC 接收器 quote.sql.identifiers 不工作
Kafka Connect JDBC Sink quote.sql.identifiers not working
我正在尝试使用 Kafka Connect 通过 JDBC 源和接收器连接器将数据从旧 DB2 数据库同步到 Postgres 数据库。它工作正常,但前提是我对 table 名称使用的大小写非常严格。
例如,我在 DB2 中有一个名为 ACTION 的 table,它也存在于具有相同列等的 Postgres 中。唯一的区别是在 DB2 中它是大写 ACTION
而在 Postgres 中小写 action
.
这是一个有效的接收器文件:
{
"name": "jdbc_sink_pg_action",
"config": {
"_comment": "The JDBC connector class",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"_comment": "How to serialise the value of keys ",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"_comment": "As above, but for the value of the message. Note that these key/value serialisation settings can be set globally for Connect and thus omitted for individual connector configs to make them shorter and clearer",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"_comment": " --- JDBC-specific configuration below here --- ",
"_comment": "JDBC connection URL.",
"connection.url": "jdbc:postgresql://localhost:5435/postgres",
"connection.user": "postgres",
"connection.password": "*****",
"topics": "ACTION",
"table.name.format": "action",
"_comment": "The insertion mode to use",
"insert.mode": "upsert",
"_comment": "The primary key mode",
"pk.mode": "record_value",
"_comment": "List of comma-separated primary key field names. The runtime interpretation of this config depends on the pk.mode",
"pk.fields": "ACTION_ID",
"quote.sql.identifiers": "never"
}
}
这样还可以,但不是很灵活。例如,我有许多其他 table,我也想同步它们,但我不想为每个 table 创建一个连接器文件。所以我尝试使用:
"table.name.format": "${topic}",
执行此操作时,当我尝试加载我的接收器连接器时,我在日志中收到以下错误:
Caused by: org.apache.kafka.connect.errors.ConnectException: Table "ACTION"
is missing and auto-creation is disabled
所以在我看来 "quote.sql.identifiers": "never"
实际上并没有工作,否则接收器连接器正在执行的查询将不被引用并且它会允许任何情况(它会转换为更低)。
为什么这不起作用?如果我只使用 ACTION
作为 table.name.format.
,我会得到相同的结果
您的 PostgreSQL table 名称 (action
) 不等于主题名称 (ACTION
)。
Kafka Connect JDBC Connector uses getTables()
方法检查 table 是否存在,其中 tableNamePattern
参数区分大小写(根据文档:must match the table name as it is stored in the database
).
您可以使用 ChangeTopicCase
transformation from Kafka Connect Common Transformations.
我正在尝试使用 Kafka Connect 通过 JDBC 源和接收器连接器将数据从旧 DB2 数据库同步到 Postgres 数据库。它工作正常,但前提是我对 table 名称使用的大小写非常严格。
例如,我在 DB2 中有一个名为 ACTION 的 table,它也存在于具有相同列等的 Postgres 中。唯一的区别是在 DB2 中它是大写 ACTION
而在 Postgres 中小写 action
.
这是一个有效的接收器文件:
{
"name": "jdbc_sink_pg_action",
"config": {
"_comment": "The JDBC connector class",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"_comment": "How to serialise the value of keys ",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"_comment": "As above, but for the value of the message. Note that these key/value serialisation settings can be set globally for Connect and thus omitted for individual connector configs to make them shorter and clearer",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"_comment": " --- JDBC-specific configuration below here --- ",
"_comment": "JDBC connection URL.",
"connection.url": "jdbc:postgresql://localhost:5435/postgres",
"connection.user": "postgres",
"connection.password": "*****",
"topics": "ACTION",
"table.name.format": "action",
"_comment": "The insertion mode to use",
"insert.mode": "upsert",
"_comment": "The primary key mode",
"pk.mode": "record_value",
"_comment": "List of comma-separated primary key field names. The runtime interpretation of this config depends on the pk.mode",
"pk.fields": "ACTION_ID",
"quote.sql.identifiers": "never"
}
}
这样还可以,但不是很灵活。例如,我有许多其他 table,我也想同步它们,但我不想为每个 table 创建一个连接器文件。所以我尝试使用:
"table.name.format": "${topic}",
执行此操作时,当我尝试加载我的接收器连接器时,我在日志中收到以下错误:
Caused by: org.apache.kafka.connect.errors.ConnectException: Table "ACTION" is missing and auto-creation is disabled
所以在我看来 "quote.sql.identifiers": "never"
实际上并没有工作,否则接收器连接器正在执行的查询将不被引用并且它会允许任何情况(它会转换为更低)。
为什么这不起作用?如果我只使用 ACTION
作为 table.name.format.
您的 PostgreSQL table 名称 (action
) 不等于主题名称 (ACTION
)。
Kafka Connect JDBC Connector uses getTables()
方法检查 table 是否存在,其中 tableNamePattern
参数区分大小写(根据文档:must match the table name as it is stored in the database
).
您可以使用 ChangeTopicCase
transformation from Kafka Connect Common Transformations.