Kafka Streams - 抑制直到 Window 结束(不关闭)

Kafka Streams - Suppress until Window End (not Close)

我正在 windowed 流上执行聚合并希望抑制早期聚合结果。我所说的早期结果是指在 window 结束之前计算的结果,而不是宽限期内发生的结果。因此,我想抑制时间戳 < window 结束的所有聚合结果,但转发时间戳 >= window 结束和时间戳 < window 关闭的所有记录。

最小的 Kafka Streams 拓扑示例:

new StreamsBuilder()
        .stream("my-topic")
        .windowedBy(TimeWindows.of(myWindowSize).grace(myGracePeriod))
        .reduce(myReducer)
        .suppress( /* searched for*/ )
        .toStream();

因此,Suppressed.untilWindowCloses( .. ) 对我来说不是一个选择,因为我必须等到宽限期到期,这可能会很长。

根据 KIP-328,使用 Suppressed.untilTimeLimit(Duration.ZERO, .. ) 可以准确地获得所需的行为(引用自 KIP 的描述):

a. How long to wait for more updates before emitting. This is an amount of time, measured either from the event time (for regular KTables) or from the window end (for windowed KTables), to buffer up each key before emitting them downstream.

但是 Kafka Streams JavaDoc as well as the corresponding implementation 暗示情况并非如此,时间限制在每个 (windowed-) 键收到第一条记录时开始倒计时,而不是 window结束。

我很高兴对此进行澄清并支持如何实现所需的行为。

KIP 描述不正确(我相应地更新了 wiki 页面)。请注意,KIP 进一步向下显示:

Rate-limited updates

Suppose we wish to reduce the rate of updates from a KTable to roughly one update every 30s per key. We don't want to use too much memory for this, and we don't think we'll have updates for more than 1000 keys at any one time.

table
  .suppress(untilTimeLimit(Duration.ofSeconds(30), maxRecords(1000)))
  .toStream(); // etc.

因此,使用untilTimeLimit用于定期发射。对于 windowed-aggregation,间隔计时器将在 window 开始时间开始——您仍然可以将等待时间设置为 "window size" 以不获取任何 "early" 更新,但是您不会在 window-end 之后看到所有更新,而只会看到 "window size intervals" 中的更新。如果你的宽限期真的很长,这可能还不够好?

您描述的用例目前不受支持,但我认为这是一个非常有趣且有用的用例。也许您可以创建功能请求票?