KStream/KTable leftjoin after transform() leads to : StreamsException: A serializer is not compatible to the actual key or value type

KStream/KTable leftjoin after transform() leads to : StreamsException: A serializer is not compatible to the actual key or value type

我写了以下 DSL:

myStream
        .leftJoin(myKtable, new MyValueJoiner())
        .groupByKey(Grouped.with(Serdes.String(),MyObject.serde()))
        .reduce((v1, v2) -> v2, Materialized.as("MY_STORE"))
        .toStream()

这工作正常,leftjoin() 没问题,reduce() 很好地具体化为状态存储,我可以在其上执行 put() 和 delete()。

然而,如果我编写 MyTranformer class 实现 Transformer 接口并执行以下操作:

myOtherStream.transform(() -> new MyTransformer<>(), MY_STORE)
        .leftJoin(myOtherKTable, new MyOtherValueJoiner<>());

然后我得到以下异常:

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.MyObject). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)

从 Javadoc 来看,leftjoin 似乎使用了默认的 serdes,而且似乎没有办法强制使用自定义 serdes,因为它可以用于其他操作数。

但是,如果我在 transform() 之后执行 leftjoin() 以外的操作,例如 mapValue() 或 filter(),它会按预期工作。但是一旦我执行 leftjoin() 我就会遇到转换异常。

我可以在 transform() 之后使用 leftjoin() 吗?

为什么在第一种情况下,即使文档说它使用默认的 serdes,leftjoin() 也能正常工作,而对于转换器它却失败了?

最后,我只是在 Javadoc 中错过了 leftJoin() 采用 Joined 参数来指定要使用的 serdes:

myOtherStream.transform(() -> new MyTransformer<>(), MY_STORE)
    .leftJoin(myOtherKTable, new MyOtherValueJoiner<>(), Joined.with(JoinedKey.serde(), JoinedValue.serde(), JoinedValueOutput.serde()));