KStream/KTable leftjoin after transform() leads to : StreamsException: A serializer is not compatible to the actual key or value type
KStream/KTable leftjoin after transform() leads to : StreamsException: A serializer is not compatible to the actual key or value type
我写了以下 DSL:
myStream
.leftJoin(myKtable, new MyValueJoiner())
.groupByKey(Grouped.with(Serdes.String(),MyObject.serde()))
.reduce((v1, v2) -> v2, Materialized.as("MY_STORE"))
.toStream()
这工作正常,leftjoin() 没问题,reduce() 很好地具体化为状态存储,我可以在其上执行 put() 和 delete()。
然而,如果我编写 MyTranformer class 实现 Transformer 接口并执行以下操作:
myOtherStream.transform(() -> new MyTransformer<>(), MY_STORE)
.leftJoin(myOtherKTable, new MyOtherValueJoiner<>());
然后我得到以下异常:
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.MyObject). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
从 Javadoc 来看,leftjoin 似乎使用了默认的 serdes,而且似乎没有办法强制使用自定义 serdes,因为它可以用于其他操作数。
但是,如果我在 transform() 之后执行 leftjoin() 以外的操作,例如 mapValue() 或 filter(),它会按预期工作。但是一旦我执行 leftjoin() 我就会遇到转换异常。
我可以在 transform() 之后使用 leftjoin() 吗?
为什么在第一种情况下,即使文档说它使用默认的 serdes,leftjoin() 也能正常工作,而对于转换器它却失败了?
最后,我只是在 Javadoc 中错过了 leftJoin() 采用 Joined 参数来指定要使用的 serdes:
myOtherStream.transform(() -> new MyTransformer<>(), MY_STORE)
.leftJoin(myOtherKTable, new MyOtherValueJoiner<>(), Joined.with(JoinedKey.serde(), JoinedValue.serde(), JoinedValueOutput.serde()));
我写了以下 DSL:
myStream
.leftJoin(myKtable, new MyValueJoiner())
.groupByKey(Grouped.with(Serdes.String(),MyObject.serde()))
.reduce((v1, v2) -> v2, Materialized.as("MY_STORE"))
.toStream()
这工作正常,leftjoin() 没问题,reduce() 很好地具体化为状态存储,我可以在其上执行 put() 和 delete()。
然而,如果我编写 MyTranformer class 实现 Transformer 接口并执行以下操作:
myOtherStream.transform(() -> new MyTransformer<>(), MY_STORE)
.leftJoin(myOtherKTable, new MyOtherValueJoiner<>());
然后我得到以下异常:
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.MyObject). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
从 Javadoc 来看,leftjoin 似乎使用了默认的 serdes,而且似乎没有办法强制使用自定义 serdes,因为它可以用于其他操作数。
但是,如果我在 transform() 之后执行 leftjoin() 以外的操作,例如 mapValue() 或 filter(),它会按预期工作。但是一旦我执行 leftjoin() 我就会遇到转换异常。
我可以在 transform() 之后使用 leftjoin() 吗?
为什么在第一种情况下,即使文档说它使用默认的 serdes,leftjoin() 也能正常工作,而对于转换器它却失败了?
最后,我只是在 Javadoc 中错过了 leftJoin() 采用 Joined 参数来指定要使用的 serdes:
myOtherStream.transform(() -> new MyTransformer<>(), MY_STORE)
.leftJoin(myOtherKTable, new MyOtherValueJoiner<>(), Joined.with(JoinedKey.serde(), JoinedValue.serde(), JoinedValueOutput.serde()));