KStream suppress operator如何确定一个window的最后一条记录?
How KStream suppress operator determines the last record of a window?
下面是带有抑制运算符的 window 的简单定义:
stream
.groupBy()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
.aggregate(...) // implementation of aggregate function
.suppress(untilWindowCloses(unbounded())
.toStream()
// process last event here
...
所以我的问题是,抑制运算符如何检测一个事件是否是 window 的最后一个事件?让我们想象一下,我删除了抑制运算符:
stream
.groupBy()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
.aggregate(...) // implementation of aggregate function
.toStream()
...
我了解 KTable
的每次更改都会生成两个事件:
- 具有
null
值的记录以删除以前的记录
- 具有新价值的新记录
我想做的是删除 suppress
运算符并自己检测最后一条记录:
stream
.groupBy()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
.aggregate(...) // implementation of aggregate function
.toStream()
.filter( /* detect the last record here */ )
此信息是否公开在 DSL 或处理器中 API?
信息只是间接暴露。 suppress()
运算符使用状态存储来跟踪以前收到的消息。这允许相互比较 old/new 消息并决定何时实际发出某些东西。
请注意,无状态 filter()
无法实现这一点。如果你想了解细节,你需要阅读源代码。
但主要问题是:您为什么要首先删除 suppress()
?
下面是带有抑制运算符的 window 的简单定义:
stream
.groupBy()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
.aggregate(...) // implementation of aggregate function
.suppress(untilWindowCloses(unbounded())
.toStream()
// process last event here
...
所以我的问题是,抑制运算符如何检测一个事件是否是 window 的最后一个事件?让我们想象一下,我删除了抑制运算符:
stream
.groupBy()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
.aggregate(...) // implementation of aggregate function
.toStream()
...
我了解 KTable
的每次更改都会生成两个事件:
- 具有
null
值的记录以删除以前的记录 - 具有新价值的新记录
我想做的是删除 suppress
运算符并自己检测最后一条记录:
stream
.groupBy()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
.aggregate(...) // implementation of aggregate function
.toStream()
.filter( /* detect the last record here */ )
此信息是否公开在 DSL 或处理器中 API?
信息只是间接暴露。 suppress()
运算符使用状态存储来跟踪以前收到的消息。这允许相互比较 old/new 消息并决定何时实际发出某些东西。
请注意,无状态 filter()
无法实现这一点。如果你想了解细节,你需要阅读源代码。
但主要问题是:您为什么要首先删除 suppress()
?