在 withValueSerde 中定义的 KafkaStreams 自定义 SerDes 被忽略
KafkaStreams custom SerDes defined in withValueSerde is ignored
我正在使用 Kafka Streams 2.6.0(在 Spring 引导中)并且 运行 遇到了非常奇怪的问题。我正在尝试对流执行有状态操作(分组和聚合):
freeTextSignPartialUpdateStream
.groupBy((key, value) -> value.getObjectId(), Grouped.with(Serdes.Long(), FTS_PARTIAL_UPDATE_MSG_SERDE))
.aggregate(
ArrayList::new,
freeTextSignUtils::updateFreeTextSignUpdateList,
Materialized.<Long, List<FreeTextSignPartialUpdate>>as(Stores.inMemoryKeyValueStore("STORE_NAME"))
.withKeySerde(Serdes.Long())
.withValueSerde(FTS_PARTIAL_UPDATE_LIST_SERDE)
.withCachingDisabled()
.withLoggingDisabled()
)
.toStream()
.to(
storesService.getFreeTextSignUpdatesStoreTopicName(),
Produced.with(Serdes.Long(), FTS_PARTIAL_UPDATE_LIST_SERDE)
);
FTS_PARTIAL_UPDATE_LIST
是一个正确的 Serdes 实现,定义为常量(类似于 FTS_PARTIAL_UPDATE_MSG_SERDE
,它可以正常工作)。
奇怪的不是我收到 ser/des 错误,而是 withValueSerde
中定义的值被完全忽略,而是使用 StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
(当前设置为 JsonSerdes
).
我在应用程序的其他流处理部分使用与上述类似的模式,没有任何问题。另外,当我用我的 FTS_PARTIAL_UPDATE_LIST
替换默认值 JsonSerdes
时,这有效。
我已经检查了 withValueSerde
文档,它说当输入值为 null
时它将回退到默认 Serdes,这显然不是(我已经在调试器中检查过)。
实际上,这是我的错误,因为 SerDes 实现中的解串器返回 null。
我正在使用 Kafka Streams 2.6.0(在 Spring 引导中)并且 运行 遇到了非常奇怪的问题。我正在尝试对流执行有状态操作(分组和聚合):
freeTextSignPartialUpdateStream
.groupBy((key, value) -> value.getObjectId(), Grouped.with(Serdes.Long(), FTS_PARTIAL_UPDATE_MSG_SERDE))
.aggregate(
ArrayList::new,
freeTextSignUtils::updateFreeTextSignUpdateList,
Materialized.<Long, List<FreeTextSignPartialUpdate>>as(Stores.inMemoryKeyValueStore("STORE_NAME"))
.withKeySerde(Serdes.Long())
.withValueSerde(FTS_PARTIAL_UPDATE_LIST_SERDE)
.withCachingDisabled()
.withLoggingDisabled()
)
.toStream()
.to(
storesService.getFreeTextSignUpdatesStoreTopicName(),
Produced.with(Serdes.Long(), FTS_PARTIAL_UPDATE_LIST_SERDE)
);
FTS_PARTIAL_UPDATE_LIST
是一个正确的 Serdes 实现,定义为常量(类似于 FTS_PARTIAL_UPDATE_MSG_SERDE
,它可以正常工作)。
奇怪的不是我收到 ser/des 错误,而是 withValueSerde
中定义的值被完全忽略,而是使用 StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
(当前设置为 JsonSerdes
).
我在应用程序的其他流处理部分使用与上述类似的模式,没有任何问题。另外,当我用我的 FTS_PARTIAL_UPDATE_LIST
替换默认值 JsonSerdes
时,这有效。
我已经检查了 withValueSerde
文档,它说当输入值为 null
时它将回退到默认 Serdes,这显然不是(我已经在调试器中检查过)。
实际上,这是我的错误,因为 SerDes 实现中的解串器返回 null。