为什么抑制功能不适用于跳跃 windows?

Why suppress functionality does not work with hopping windows?

如何让它发挥作用? 我的代码示例:

KStream<String, String> finalStream = source
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
                .reduce((aggValue, newValue) -> newValue, Materialized.with(Serdes.String(), Serdes.String()))
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream();

在我 运行 上面的代码之后 - 输出流为空。没有 errors/exceptions。 注意:使用 Tumbling Window 代码按预期工作。 也许我只是使用不正确?

默认情况下,window宽限期为 24 小时。因此,如果您不更改它,suppress() 将不会在宽限期过去之前发出任何数据。

您可以通过 TimeWindows#grace(Duration) 缩短宽限期(参见:https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/TimeWindows.html#grace-java.time.Duration-

顺便说一句:您可能还想减少这种情况下的存储保留时间(默认情况下也是 24 小时),通过:

reduce(..., Materialized.withRetention(Duration))