Kafka Connect JDBC 在 JsonConverter 上失败
Kafka Connect JDBC failed on JsonConverter
我正在设计 MySQL -> Debezium -> Kafka -> Flink -> Kafka -> Kafka Connect JDBC -> MySQL。以下是我从 Flink 写的示例消息(我也尝试使用 Kafka 控制台生产者)
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "name"
}
],
"optional": true,
"name": "user"
},
"payload": {
"id": 1,
"name": "Smith"
}
}
但 JsonConverter 连接失败
DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:338)
我已经调试并且在方法 public SchemaAndValue toConnectData(String topic, byte[] value)
中值为空。我的水槽配置是:
{
"name": "user-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "user",
"connection.url": "jdbc:mysql://localhost:3306/my_db?verifyServerCertificate=false",
"connection.user": "root",
"connection.password": "root",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}
有人可以帮我解决这个问题吗?
我认为问题与(Kafka 消息的)值序列化无关。消息的密钥是相当有问题的。
你的 key.converter
是什么?我认为它与 value.converter
(org.apache.kafka.connect.json.JsonConverter
) 一样。您的密钥可能很简单 String
,不包含 schema
、payload
尝试将key.converter
更改为org.apache.kafka.connect.storage.StringConverter
对于 Kafka Connect,您可以设置默认设置 Converters
,但您也可以为您的特定连接器配置设置特定的设置(这将覆盖默认设置)。为此,您必须修改配置请求:
{
"name": "user-sink",
"config": {
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "user",
"connection.url": "jdbc:mysql://localhost:3306/my_db?verifyServerCertificate=false",
"connection.user": "root",
"connection.password": "root",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}
我正在设计 MySQL -> Debezium -> Kafka -> Flink -> Kafka -> Kafka Connect JDBC -> MySQL。以下是我从 Flink 写的示例消息(我也尝试使用 Kafka 控制台生产者)
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "name"
}
],
"optional": true,
"name": "user"
},
"payload": {
"id": 1,
"name": "Smith"
}
}
但 JsonConverter 连接失败
DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:338)
我已经调试并且在方法 public SchemaAndValue toConnectData(String topic, byte[] value)
中值为空。我的水槽配置是:
{
"name": "user-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "user",
"connection.url": "jdbc:mysql://localhost:3306/my_db?verifyServerCertificate=false",
"connection.user": "root",
"connection.password": "root",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}
有人可以帮我解决这个问题吗?
我认为问题与(Kafka 消息的)值序列化无关。消息的密钥是相当有问题的。
你的 key.converter
是什么?我认为它与 value.converter
(org.apache.kafka.connect.json.JsonConverter
) 一样。您的密钥可能很简单 String
,不包含 schema
、payload
尝试将key.converter
更改为org.apache.kafka.connect.storage.StringConverter
对于 Kafka Connect,您可以设置默认设置 Converters
,但您也可以为您的特定连接器配置设置特定的设置(这将覆盖默认设置)。为此,您必须修改配置请求:
{
"name": "user-sink",
"config": {
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "user",
"connection.url": "jdbc:mysql://localhost:3306/my_db?verifyServerCertificate=false",
"connection.user": "root",
"connection.password": "root",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}