无法将 Avro 特定记录反序列化为 Spring Cloud Stream 中的通用记录
Not able to deserialize Avro specific record as a generic record in Spring Cloud Stream
我想要一个带有 Spring Cloud Stream 的通用消费者,我不需要在编译时专门指定 Avro 消息的架构。由于模式 ID 包含在消息中,因此应该可以使用模式 ID 将其反序列化为对象或通用记录(这可以在 Spring Cloud Stream 框架之外轻松完成)。为此,我尝试使用与我们用于拥有特定记录的 spring 云流应用程序相同的方法。很明显,这种方法行不通,因为我需要在消费时指定记录类型。它抛出以下异常:
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class com.example.avro.model.InputModel specified in writer's schema whilst finding reader's schema for a SpecificRecord.
我想知道是否有一种方法可以在编译时不知道架构(和消息类型)的情况下使用来自主题的消息。
消费者片段:
@StreamListener(Processor.INPUT)
public void handleMessage(Object message) {
...
}
P.S:我正在使用 io.confluent.kafka.serializers.KafkaAvroDeserializer
作为值反序列化器。
我能够通过设置 spring.kafka.properties.specific.avro.reader= false
以及处理程序的以下方法签名来解决此问题:
@StreamListener(Processor.INPUT)
public void handleMessage(GenericRecord record) {
...
}
我见过类似的情况,在我的 yml 中,指定了 KafkaAvroDeserializer
consumer-properties:
key.deserializer:org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
然后在应用中,
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public void hand(Object payload) throws IOException {
GenericRecord record = (GenericRecord) payload;
}
我想要一个带有 Spring Cloud Stream 的通用消费者,我不需要在编译时专门指定 Avro 消息的架构。由于模式 ID 包含在消息中,因此应该可以使用模式 ID 将其反序列化为对象或通用记录(这可以在 Spring Cloud Stream 框架之外轻松完成)。为此,我尝试使用与我们用于拥有特定记录的 spring 云流应用程序相同的方法。很明显,这种方法行不通,因为我需要在消费时指定记录类型。它抛出以下异常:
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class com.example.avro.model.InputModel specified in writer's schema whilst finding reader's schema for a SpecificRecord.
我想知道是否有一种方法可以在编译时不知道架构(和消息类型)的情况下使用来自主题的消息。
消费者片段:
@StreamListener(Processor.INPUT)
public void handleMessage(Object message) {
...
}
P.S:我正在使用 io.confluent.kafka.serializers.KafkaAvroDeserializer
作为值反序列化器。
我能够通过设置 spring.kafka.properties.specific.avro.reader= false
以及处理程序的以下方法签名来解决此问题:
@StreamListener(Processor.INPUT)
public void handleMessage(GenericRecord record) {
...
}
我见过类似的情况,在我的 yml 中,指定了 KafkaAvroDeserializer
consumer-properties:
key.deserializer:org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
然后在应用中,
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public void hand(Object payload) throws IOException {
GenericRecord record = (GenericRecord) payload;
}