Kafka DSL Kstream->Ktable Join - Joined Serialization compile Error
Kafka DSL Kstream->Ktable Join - Joined Serialization compile Error
我正在尝试根据此 kafka documentation 实现此连接。
我不知道为什么这个连接不起作用...
首先我传递了所有值。
这里试图强制泛型为Object类型,这是非常错误的。
没有加入序列化选项我收到这个运行时异常:
Exception in thread "StreamAPP-stream-event-b3dc5fff-abee-4fa0-92f9-e1690f8fd152-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic StreamAPP-stream-event-KSTREAM-KEY-SELECT-0000000025-repartition. A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: br.com.calebebrim.kafka.entities.stream.sharing.registry.StreamRegistryKey / value type: br.com.calebebrim.kafka.entities.stream.sharing.stream.Event). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, #to(String topic, Produced<K, V> produced)
with Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))
)
谁能帮帮我?
谢谢!
已解决,
我发现连接操作不能转换数据。
所以,我只是在之前应用了 mapValues:
我正在尝试根据此 kafka documentation 实现此连接。
我不知道为什么这个连接不起作用...
首先我传递了所有值。
没有加入序列化选项我收到这个运行时异常:
Exception in thread "StreamAPP-stream-event-b3dc5fff-abee-4fa0-92f9-e1690f8fd152-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic StreamAPP-stream-event-KSTREAM-KEY-SELECT-0000000025-repartition. A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: br.com.calebebrim.kafka.entities.stream.sharing.registry.StreamRegistryKey / value type: br.com.calebebrim.kafka.entities.stream.sharing.stream.Event). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL,
#to(String topic, Produced<K, V> produced)
withProduced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))
)
谁能帮帮我?
谢谢!
已解决,
我发现连接操作不能转换数据。
所以,我只是在之前应用了 mapValues: