KStream suppress operator如何确定一个window的最后一条记录?

How KStream suppress operator determines the last record of a window?

下面是带有抑制运算符的 window 的简单定义:

stream
  .groupBy()
  .windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
  .aggregate(...) // implementation of aggregate function
  .suppress(untilWindowCloses(unbounded())
  .toStream()
  // process last event here
  ... 

所以我的问题是,抑制运算符如何检测一个事件是否是 window 的最后一个事件?让我们想象一下,我删除了抑制运算符:

stream
  .groupBy()
  .windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
  .aggregate(...) // implementation of aggregate function
  .toStream()
  ... 

我了解 KTable 的每次更改都会生成两个事件:

  1. 具有 null 值的记录以删除以前的记录
  2. 具有新价值的新记录

我想做的是删除 suppress 运算符并自己检测最后一条记录:

stream
  .groupBy()
  .windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
  .aggregate(...) // implementation of aggregate function
  .toStream()
  .filter( /* detect the last record here */ )

此信息是否公开在 DSL 或处理器中 API?

信息只是间接暴露。 suppress() 运算符使用状态存储来跟踪以前收到的消息。这允许相互比较 old/new 消息并决定何时实际发出某些东西。

请注意,无状态 filter() 无法实现这一点。如果你想了解细节,你需要阅读源代码。

但主要问题是:您为什么要首先删除 suppress()