Kafka-Connect JDBC Sink 在 upsert 期间报告 null id
Kafka-Connect JDBC Sink reports null id during upsert
我是 Kafka / Kafka Connect 的新手,我 运行 遇到了融合 JDBC 连接器的问题。目前我正在使用 Confluent Community docker compose.
我可以成功创建一个从 mysql 数据库读取到 kafka 的源。
curl -X POST \
-H "Content-Type: application/json" \
--data '{ "name": "college_mysql_source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": 1, "connection.url": "jdbc:mysql://host.docker.internal:3306/...", "mode": "timestamp+incrementing", "timestamp.column.name": "updated_on", "topic.prefix": "college_mysql_", "poll.interval.ms": 1000, "table.whitelist": "college" } }' \
http://localhost:8083/connectors
数据按预期进入 Kafka,每一列都在 Avro 中正确表示。如果我通过 CLI 创建消费者,我可以看到数据是正确的。
{
"id":112525,
"pim_id":{"long":78806},
"college_name":{"string":"Western University of Health Sciences"},
...
}
如果我创建一个简单的 JDBC 接收器将数据放入另一个 mysql 数据库,一切都很好:
curl -X POST -H "Content-Type: application/json" \
--data '{"name": "weighted_average_mysql_sink_college", "config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max":"1", "topics":"college_mysql_college", "connection.url": "jdbc:mysql://host.docker.internal:3306/...", "auto.create": "true", "insert.mode": "insert"}}' \
http://localhost:8083/connectors
我们正确地创建了一个 table 并且新记录在所有字段(包括 id)正确填充的情况下都很好。但是,如果我改为创建一个使用插入模式 upsert 的接收器,我就会开始出错。
curl -X POST -H "Content-Type: application/json" \
--data '{"name": "weighted_average_mysql_sink_college", "config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max":"1", "topics":"college_mysql_college", "connection.url": "jdbc:mysql://host.docker.internal:3306/...", "auto.create": "true", "insert.mode": "upsert", "pk.mode": "record_key", "pk.fields": "id"}}' \
http://localhost:8083/connectors
这确实正确地创建了 table 并正确地建立了 id
作为主键,到目前为止一切顺利,但是现在每当它从主题中读取时我们都会得到一个错误:
java.sql.BatchUpdateException: Column 'id' cannot be null
这就是我卡住的地方。主题中的数据正确地具有一个 ID 字段,如果我没有将该列声明为 PK,则该 ID 字段用于 ID 列。我尝试自己定义 table 而不是让接收器创建 table,我想 table 的创建可能有一些奇怪的问题,但这似乎不是无论如何我都会得到完全相同的错误。对此有任何建议或指导,我将不胜感激,我希望解决方案很简单,我只是遗漏了一些显而易见的东西,那些有更多经验的人可以向我指出。
谢谢!
您需要设置“pk.mode”:“record_value”
我是 Kafka / Kafka Connect 的新手,我 运行 遇到了融合 JDBC 连接器的问题。目前我正在使用 Confluent Community docker compose.
我可以成功创建一个从 mysql 数据库读取到 kafka 的源。
curl -X POST \
-H "Content-Type: application/json" \
--data '{ "name": "college_mysql_source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": 1, "connection.url": "jdbc:mysql://host.docker.internal:3306/...", "mode": "timestamp+incrementing", "timestamp.column.name": "updated_on", "topic.prefix": "college_mysql_", "poll.interval.ms": 1000, "table.whitelist": "college" } }' \
http://localhost:8083/connectors
数据按预期进入 Kafka,每一列都在 Avro 中正确表示。如果我通过 CLI 创建消费者,我可以看到数据是正确的。
{
"id":112525,
"pim_id":{"long":78806},
"college_name":{"string":"Western University of Health Sciences"},
...
}
如果我创建一个简单的 JDBC 接收器将数据放入另一个 mysql 数据库,一切都很好:
curl -X POST -H "Content-Type: application/json" \
--data '{"name": "weighted_average_mysql_sink_college", "config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max":"1", "topics":"college_mysql_college", "connection.url": "jdbc:mysql://host.docker.internal:3306/...", "auto.create": "true", "insert.mode": "insert"}}' \
http://localhost:8083/connectors
我们正确地创建了一个 table 并且新记录在所有字段(包括 id)正确填充的情况下都很好。但是,如果我改为创建一个使用插入模式 upsert 的接收器,我就会开始出错。
curl -X POST -H "Content-Type: application/json" \
--data '{"name": "weighted_average_mysql_sink_college", "config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max":"1", "topics":"college_mysql_college", "connection.url": "jdbc:mysql://host.docker.internal:3306/...", "auto.create": "true", "insert.mode": "upsert", "pk.mode": "record_key", "pk.fields": "id"}}' \
http://localhost:8083/connectors
这确实正确地创建了 table 并正确地建立了 id
作为主键,到目前为止一切顺利,但是现在每当它从主题中读取时我们都会得到一个错误:
java.sql.BatchUpdateException: Column 'id' cannot be null
这就是我卡住的地方。主题中的数据正确地具有一个 ID 字段,如果我没有将该列声明为 PK,则该 ID 字段用于 ID 列。我尝试自己定义 table 而不是让接收器创建 table,我想 table 的创建可能有一些奇怪的问题,但这似乎不是无论如何我都会得到完全相同的错误。对此有任何建议或指导,我将不胜感激,我希望解决方案很简单,我只是遗漏了一些显而易见的东西,那些有更多经验的人可以向我指出。
谢谢!
您需要设置“pk.mode”:“record_value”