kafka connect JdbcSourceConnector反序列化问题

kafka connect JdbcSourceConnector deserialization issue

我正在使用 kafka connect 连接到数据库以存储有关压缩主题的信息,并且在尝试在 spring 云流应用程序中使用该主题时遇到反序列化问题。

连接器配置:

    {
  "name": "my-connector",
  "config": {
    "name": "my-connector",
    "poll.interval.ms": "86400000",
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "oracle-jdbc-string",
    "connection.user": "testid",
    "connection.password": "test",
    "catalog.pattern": "mySchema",
    "table.whitelist": "MY_TABLE",
    "table.types": "TABLE",
    "mode": "bulk",
    "numeric.mapping": "best_fit",
    "transforms": "createKey, extractCode, UpdateTopicName",
    "transforms.UpdateTopicName.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.extractCode.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractCode.field": "ID",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "ID",
    "transforms.UpdateTopicName.regex": "(.*)",
    "transforms.UpdateTopicName.replacement": "my_topic",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "topic.prefix": "nt_mny_"
  }
}

连接器似乎工作正常,并在主题上放置了适当的消息,使用 kafka-console-consumer 时示例消息如下所示

kafka-console-consumer --bootstrap-server localhost.ntrs.com:9092 --topic nt_mny_ece_alert_avro --from-beginning  --property print.key=true | jq '.'

7247
0
{
  "ID": 7247,
  "USER_SK": 5623,
  "TYP_CDE": "TEST",
  "ALRT_ACTIVE_FLAG": "Y",
  "ALRT_DESC": "My Alert",
  "ALRT_STATUS": "VISIBLE",
  "CREAT_BY": "ME",
  "CREAT_TM": 1593547299565,
  "UPD_BY": "ME",
  "UPD_TM": 1593547299565
}

我想知道打印在键和值之间的 0 是问题还是只是卡夫卡噪声。

我在代码中看到的问题是

org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7); nested exception is java.io.CharConversionException: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7)

而我的processor/sink代码比较简单。

@StreamListener
public void process(
    @Input(MyAlertsProcessor.MY_ALERT_AVRO) KStream<String, Json> myAlertKconnectStream) {

    myAlertKconnectStream.peek((key,value) -> {
        System.out.println("HELOOOOOO");
        logger.debug("ece/pre: key={}, value={}",key,value);});

}

我花了好几天时间试图解决这个问题,但几乎没有任何帮助,我们将不胜感激!

您使用的是 JSON 架构转换器 (io.confluent.connect.json.JsonSchemaConverter),而不是 JSON 转换器 (org.apache.kafka.connect.json.JsonConverter)。

JSON 模式转换器使用模式注册表来存储模式,并将有关它的信息放在消息的前几个字节中。这就是使您的代码出错的原因 (Could not read JSON: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7))。

因此,要么在您的代码中使用 JSON Schema deserialiser(更好),要么切换到使用 org.apache.kafka.connect.json.JsonConverter 转换器(不太可取;然后您会丢弃架构)。

更多详情:https://rmoff.net/2020/07/03/why-json-isnt-the-same-as-json-schema-in-kafka-connect-converters-and-ksqldb-viewing-kafka-messages-bytes-as-hex/