kafka connect JdbcSourceConnector反序列化问题
kafka connect JdbcSourceConnector deserialization issue
我正在使用 kafka connect 连接到数据库以存储有关压缩主题的信息,并且在尝试在 spring 云流应用程序中使用该主题时遇到反序列化问题。
连接器配置:
{
"name": "my-connector",
"config": {
"name": "my-connector",
"poll.interval.ms": "86400000",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "oracle-jdbc-string",
"connection.user": "testid",
"connection.password": "test",
"catalog.pattern": "mySchema",
"table.whitelist": "MY_TABLE",
"table.types": "TABLE",
"mode": "bulk",
"numeric.mapping": "best_fit",
"transforms": "createKey, extractCode, UpdateTopicName",
"transforms.UpdateTopicName.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.extractCode.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractCode.field": "ID",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "ID",
"transforms.UpdateTopicName.regex": "(.*)",
"transforms.UpdateTopicName.replacement": "my_topic",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"topic.prefix": "nt_mny_"
}
}
连接器似乎工作正常,并在主题上放置了适当的消息,使用 kafka-console-consumer 时示例消息如下所示
kafka-console-consumer --bootstrap-server localhost.ntrs.com:9092 --topic nt_mny_ece_alert_avro --from-beginning --property print.key=true | jq '.'
7247
0
{
"ID": 7247,
"USER_SK": 5623,
"TYP_CDE": "TEST",
"ALRT_ACTIVE_FLAG": "Y",
"ALRT_DESC": "My Alert",
"ALRT_STATUS": "VISIBLE",
"CREAT_BY": "ME",
"CREAT_TM": 1593547299565,
"UPD_BY": "ME",
"UPD_TM": 1593547299565
}
我想知道打印在键和值之间的 0 是问题还是只是卡夫卡噪声。
我在代码中看到的问题是
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7); nested exception is java.io.CharConversionException: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7)
而我的processor/sink代码比较简单。
@StreamListener
public void process(
@Input(MyAlertsProcessor.MY_ALERT_AVRO) KStream<String, Json> myAlertKconnectStream) {
myAlertKconnectStream.peek((key,value) -> {
System.out.println("HELOOOOOO");
logger.debug("ece/pre: key={}, value={}",key,value);});
}
我花了好几天时间试图解决这个问题,但几乎没有任何帮助,我们将不胜感激!
您使用的是 JSON 架构转换器 (io.confluent.connect.json.JsonSchemaConverter
),而不是 JSON 转换器 (org.apache.kafka.connect.json.JsonConverter
)。
JSON 模式转换器使用模式注册表来存储模式,并将有关它的信息放在消息的前几个字节中。这就是使您的代码出错的原因 (Could not read JSON: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7)
)。
因此,要么在您的代码中使用 JSON Schema deserialiser(更好),要么切换到使用 org.apache.kafka.connect.json.JsonConverter
转换器(不太可取;然后您会丢弃架构)。
我正在使用 kafka connect 连接到数据库以存储有关压缩主题的信息,并且在尝试在 spring 云流应用程序中使用该主题时遇到反序列化问题。
连接器配置:
{
"name": "my-connector",
"config": {
"name": "my-connector",
"poll.interval.ms": "86400000",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "oracle-jdbc-string",
"connection.user": "testid",
"connection.password": "test",
"catalog.pattern": "mySchema",
"table.whitelist": "MY_TABLE",
"table.types": "TABLE",
"mode": "bulk",
"numeric.mapping": "best_fit",
"transforms": "createKey, extractCode, UpdateTopicName",
"transforms.UpdateTopicName.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.extractCode.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractCode.field": "ID",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "ID",
"transforms.UpdateTopicName.regex": "(.*)",
"transforms.UpdateTopicName.replacement": "my_topic",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"topic.prefix": "nt_mny_"
}
}
连接器似乎工作正常,并在主题上放置了适当的消息,使用 kafka-console-consumer 时示例消息如下所示
kafka-console-consumer --bootstrap-server localhost.ntrs.com:9092 --topic nt_mny_ece_alert_avro --from-beginning --property print.key=true | jq '.'
7247
0
{
"ID": 7247,
"USER_SK": 5623,
"TYP_CDE": "TEST",
"ALRT_ACTIVE_FLAG": "Y",
"ALRT_DESC": "My Alert",
"ALRT_STATUS": "VISIBLE",
"CREAT_BY": "ME",
"CREAT_TM": 1593547299565,
"UPD_BY": "ME",
"UPD_TM": 1593547299565
}
我想知道打印在键和值之间的 0 是问题还是只是卡夫卡噪声。
我在代码中看到的问题是
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7); nested exception is java.io.CharConversionException: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7)
而我的processor/sink代码比较简单。
@StreamListener
public void process(
@Input(MyAlertsProcessor.MY_ALERT_AVRO) KStream<String, Json> myAlertKconnectStream) {
myAlertKconnectStream.peek((key,value) -> {
System.out.println("HELOOOOOO");
logger.debug("ece/pre: key={}, value={}",key,value);});
}
我花了好几天时间试图解决这个问题,但几乎没有任何帮助,我们将不胜感激!
您使用的是 JSON 架构转换器 (io.confluent.connect.json.JsonSchemaConverter
),而不是 JSON 转换器 (org.apache.kafka.connect.json.JsonConverter
)。
JSON 模式转换器使用模式注册表来存储模式,并将有关它的信息放在消息的前几个字节中。这就是使您的代码出错的原因 (Could not read JSON: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7)
)。
因此,要么在您的代码中使用 JSON Schema deserialiser(更好),要么切换到使用 org.apache.kafka.connect.json.JsonConverter
转换器(不太可取;然后您会丢弃架构)。