如何解决Kafka Connect JSONConverter "Schema must contain 'type' field"
How to solve Kafka Connect JSONConverter "Schema must contain 'type' field"
正在尝试向 JdbcSink 推送消息,消息如下
{
"schema": {
"type": "struct",
"fields": [{
"field": "ID",
"type": {
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
}, {
"field": "STORE_DATE",
"type": ["null", {
"type": "long",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"logicalType": "timestamp-millis"
}],
"default": null
}, {
"field": "DATA",
"type": ["null", "string"],
"default": null
}],
"name": "KAFKA_STREAM"
},
"payload": {
"ID": 17,
"STORE_DATE": null,
"DATA": "THIS IS TEST DATA"
}
}
但它一直抛出错误Caused by: org.apache.kafka.connect.errors.DataException: Schema must contain 'type' field
这是当前使用的连接器配置
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "DEV_KAFKA_STREAM",
"connection.url": "url",
"connection.user": "user",
"connection.password": "password",
"insert.mode": "insert",
"table.name.format": "KAFKA_STREAM",
"pk.fields": "ID",
"auto.create": "false",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true"
}
我不确定如何调试它或如何找到根本原因,因为 json 确实有 type
字段
据我所知,"long"
不是有效的架构类型。
你想要"int64"
您可能还想删除联合。有一个 optional
键来指定可为空的字段
如果您在 java 中创建 JSON,您应该在两个 JSONNode 对象周围使用 SchemaBuilder 和 Envelope class 以确保正确构建有效载荷
正在尝试向 JdbcSink 推送消息,消息如下
{
"schema": {
"type": "struct",
"fields": [{
"field": "ID",
"type": {
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
}, {
"field": "STORE_DATE",
"type": ["null", {
"type": "long",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"logicalType": "timestamp-millis"
}],
"default": null
}, {
"field": "DATA",
"type": ["null", "string"],
"default": null
}],
"name": "KAFKA_STREAM"
},
"payload": {
"ID": 17,
"STORE_DATE": null,
"DATA": "THIS IS TEST DATA"
}
}
但它一直抛出错误Caused by: org.apache.kafka.connect.errors.DataException: Schema must contain 'type' field
这是当前使用的连接器配置
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "DEV_KAFKA_STREAM",
"connection.url": "url",
"connection.user": "user",
"connection.password": "password",
"insert.mode": "insert",
"table.name.format": "KAFKA_STREAM",
"pk.fields": "ID",
"auto.create": "false",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true"
}
我不确定如何调试它或如何找到根本原因,因为 json 确实有 type
字段
据我所知,"long"
不是有效的架构类型。
你想要"int64"
您可能还想删除联合。有一个 optional
键来指定可为空的字段
如果您在 java 中创建 JSON,您应该在两个 JSONNode 对象周围使用 SchemaBuilder 和 Envelope class 以确保正确构建有效载荷