如何为 Kafka KeyValueStore 设置值 de/serializer?
How do I set the value de/serializer for a Kafka KeyValueStore?
我将来自 Kafka 主题的消息存储在 KeyValueStore 中,以便稍后查询它们。我创建一个 KTable 如下:
@StreamListener
public void process(@Input("input") KTable<String,MyMessage> myMessages) {
我在 application.yml 中配置消费者如下:
已更新de/serializer 程序包
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
materializedAs: all-messages
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.me.MyMessageDeserializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.me.MyMessageSerializer
但是,当我从 KeyValueStore 中读取时,键作为字符串正确返回,但返回的值是字节数组而不是 MyMessage。出于某种原因,我的自定义解串器没有被使用。我试图自己反序列化消息,但我的反序列化器因异常而崩溃。我在我的序列化器上放置了一个断点,但它从未被调用过。我很清楚我的序列化器或反序列化器都没有被使用。
我缺少什么配置才能使用我的自定义值 de/serializer? de/serializer 是否需要位于特定包中才能找到?
在 application.yml 中使用了错误的配置密钥。而不是 key-deserializer: 它应该是 keySerde: 而不是 value-deserializer: 它应该是 valueSerde。以下是正确的配置:
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
materializedAs: all-messages
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.me.MyMessageSerde
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.me.MyMessageSerde
我将来自 Kafka 主题的消息存储在 KeyValueStore 中,以便稍后查询它们。我创建一个 KTable 如下:
@StreamListener
public void process(@Input("input") KTable<String,MyMessage> myMessages) {
我在 application.yml 中配置消费者如下:
已更新de/serializer 程序包
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
materializedAs: all-messages
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.me.MyMessageDeserializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.me.MyMessageSerializer
但是,当我从 KeyValueStore 中读取时,键作为字符串正确返回,但返回的值是字节数组而不是 MyMessage。出于某种原因,我的自定义解串器没有被使用。我试图自己反序列化消息,但我的反序列化器因异常而崩溃。我在我的序列化器上放置了一个断点,但它从未被调用过。我很清楚我的序列化器或反序列化器都没有被使用。
我缺少什么配置才能使用我的自定义值 de/serializer? de/serializer 是否需要位于特定包中才能找到?
在 application.yml 中使用了错误的配置密钥。而不是 key-deserializer: 它应该是 keySerde: 而不是 value-deserializer: 它应该是 valueSerde。以下是正确的配置:
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
materializedAs: all-messages
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.me.MyMessageSerde
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.me.MyMessageSerde