为什么抑制功能不适用于跳跃 windows?
Why suppress functionality does not work with hopping windows?
如何让它发挥作用?
我的代码示例:
KStream<String, String> finalStream = source
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
.reduce((aggValue, newValue) -> newValue, Materialized.with(Serdes.String(), Serdes.String()))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream();
在我 运行 上面的代码之后 - 输出流为空。没有 errors/exceptions。
注意:使用 Tumbling Window 代码按预期工作。
也许我只是使用不正确?
默认情况下,window宽限期为 24 小时。因此,如果您不更改它,suppress()
将不会在宽限期过去之前发出任何数据。
您可以通过 TimeWindows#grace(Duration)
缩短宽限期(参见:https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/TimeWindows.html#grace-java.time.Duration-)
顺便说一句:您可能还想减少这种情况下的存储保留时间(默认情况下也是 24 小时),通过:
reduce(..., Materialized.withRetention(Duration))
如何让它发挥作用? 我的代码示例:
KStream<String, String> finalStream = source
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
.reduce((aggValue, newValue) -> newValue, Materialized.with(Serdes.String(), Serdes.String()))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream();
在我 运行 上面的代码之后 - 输出流为空。没有 errors/exceptions。 注意:使用 Tumbling Window 代码按预期工作。 也许我只是使用不正确?
默认情况下,window宽限期为 24 小时。因此,如果您不更改它,suppress()
将不会在宽限期过去之前发出任何数据。
您可以通过 TimeWindows#grace(Duration)
缩短宽限期(参见:https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/TimeWindows.html#grace-java.time.Duration-)
顺便说一句:您可能还想减少这种情况下的存储保留时间(默认情况下也是 24 小时),通过:
reduce(..., Materialized.withRetention(Duration))