KafkaStream不使用Consumed.with()中给出的serde,而是使用默认的serde

KafkaStream does not use the serde given in Consumed.with(), but uses the default serde

我创建了从 kafka 消费的 Serde,如下所示

import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;

final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Serde<JsonNode> jsonNodeSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

final StreamsBuilder builder = new StreamsBuilder();

final KStream<String, JsonNode> eventStream = builder
                .stream("my-test-1",
                        Consumed.with(Serdes.String(), jsonNodeSerde)

但仍然收到序列化错误:

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.fasterxml.jackson.databind.node.ObjectNode). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

既然已经提供了Consumed.with(),为什么还要使用默认的serde呢?作为这里写的答案,这应该有效,或者?

是的,问题是您的数据与 serdes 不匹配。

A serializer (key: org.apache.kafka.common.serialization.StringSerializer /
              value: org.apache.kafka.common.serialization.ByteArraySerializer)
is not compatible to the actual key or value type
             (key type: java.lang.String /
              value type: com.fasterxml.jackson.databind.node.ObjectNode).

但是,错误消息说问题是在数据被序列化时引起的,即当Kafka Streams试图写入数据时.

但是,您的带有 Consumed 的代码片段是关于 反序列化 并因此 读取 数据。因此,问题似乎不是由您在问题中共享的代码片段引起的,而是由您的 Java 文件中可能更靠后的代码引起的,您的问题中未显示该代码。 (顺便说一句,如果您提供了错误的完整堆栈跟踪,将会有所帮助。)