使用带有 Gson 的 mapValue() 方法的 Kafka-streams 应用程序错误

Kafka-streams application error using mapValue() method with Gson

我编写了一个 kafka-streams 应用程序,它从主题 "topic_one" 获取数据(数据是从 MySQL 接收到的)。然后我想用KStream接口得到这个数据的一部分(section "after",见下文)来做其他操作。但是我在序列化时遇到错误,然后我使用 mapValue()。我是 kafka-streams 的新手,不知道如何制作和使用合适的 serde。有人可以帮助我吗?

源数据来自topic_one:

[KSTREAM-SOURCE-0000000000]: null, {"before": null, "after": {"id": 1, "category": 1, "item": "abc"}, "source": {"version": "0.8.3.Final", "name": "example", "server_id": 1, "ts_sec": 1581491071, "gtid": null, "file": "mysql-bin.000013", "pos": 217827349, "row": 0, "snapshot": false, "thread": 95709, "db": "example", "table": "item", "query": null}, "op": "c", "ts_ms": 1581491071727}

我想得到:

{"id": 1, "category": 1, "item": "abc"}

我的代码:

    Properties properties = getProperties();

    try {
        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> resourceStream = builder.stream("topic_one");
        resourceStream.print(Printed.toSysOut());

        KStream<String, String> resultStream = resourceStream.mapValues(value ->
                new Gson().fromJson(value, JsonObject.class).get("after").getAsJsonObject().toString());
        resultStream.print(Printed.toSysOut());

        Topology topology = builder.build();

        KafkaStreams streams = new KafkaStreams(topology, properties);

        streams.cleanUp();
        streams.start();

    } catch (Exception e) {
        System.out.println(e.getMessage());
    }
}

private static Properties getProperties() {

    Properties properties = new Properties(); // TODO настройки вынести в отдельный файл?

    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.put("schema.registry.url", "http://localhost:8081");

    return properties;
}

错误:

Exception in thread "streams_id-db618fbf-c3e4-468b-a5a2-18e6b0b9c6be-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=matomo.matomo.matomo_scenarios_directory, partition=0, offset=30, stacktrace=org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. **Make sure the Processor can accept the deserialized input of type key: unknown because key is null, and value: org.apache.avro.generic.GenericData$Record.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.**
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:122)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.String
    at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey(AbstractStream.java:103)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
    ... 10 more

    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:446)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
**Caused by: org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: unknown because key is null, and value: org.apache.avro.generic.GenericData$Record.**
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:122)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
    ... 5 more
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.String
    at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey(AbstractStream.java:103)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
    ... 10 more

在您的 getProperties() 方法中,您将值 serde 定义为 GenericAvroSerde.class,但是当您创建流时,您使用 String 作为值类型。这就是你在运行时得到异常的原因。

KStream<String, String> resourceStream = ...
KStream<String, String> resultStream = ...

如果您真的使用 Avro 作为消息格式,那么在定义您时就可以使用正确的类型 KStream。但看起来,您只有 JSON 个字符串作为值,因此您可以通过替换

来设置正确的值 serde
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);

properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

希望对您有所帮助。