在 withValueSerde 中定义的 KafkaStreams 自定义 SerDes 被忽略

KafkaStreams custom SerDes defined in withValueSerde is ignored

我正在使用 Kafka Streams 2.6.0(在 Spring 引导中)并且 运行 遇到了非常奇怪的问题。我正在尝试对流执行有状态操作(分组和聚合):

        freeTextSignPartialUpdateStream
                .groupBy((key, value) -> value.getObjectId(), Grouped.with(Serdes.Long(), FTS_PARTIAL_UPDATE_MSG_SERDE))
                .aggregate(
                        ArrayList::new,
                        freeTextSignUtils::updateFreeTextSignUpdateList,
                        Materialized.<Long, List<FreeTextSignPartialUpdate>>as(Stores.inMemoryKeyValueStore("STORE_NAME"))
                                .withKeySerde(Serdes.Long())
                                .withValueSerde(FTS_PARTIAL_UPDATE_LIST_SERDE)
                                .withCachingDisabled()
                                .withLoggingDisabled()
                )
                .toStream()
                .to(
                        storesService.getFreeTextSignUpdatesStoreTopicName(),
                        Produced.with(Serdes.Long(), FTS_PARTIAL_UPDATE_LIST_SERDE)
                );

FTS_PARTIAL_UPDATE_LIST 是一个正确的 Serdes 实现,定义为常量(类似于 FTS_PARTIAL_UPDATE_MSG_SERDE,它可以正常工作)。

奇怪的不是我收到 ser/des 错误,而是 withValueSerde 中定义的值被完全忽略,而是使用 StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG(当前设置为 JsonSerdes).

我在应用程序的其他流处理部分使用与上述类似的模式,没有任何问题。另外,当我用我的 FTS_PARTIAL_UPDATE_LIST 替换默认值 JsonSerdes 时,这有效。

我已经检查了 withValueSerde 文档,它说当输入值为 null 时它将回退到默认 Serdes,这显然不是(我已经在调试器中检查过)。

实际上,这是我的错误,因为 SerDes 实现中的解串器返回 null。