使用触发器限制 window 的最大长度
Limiting the max length of window using triggers
我正在研究如何限制会话的最大长度。在做触发的时候。我当前的触发器如下所示:
return AfterEach.inOrder(
// speculatively trigger
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(earlyFiringInterval).orFinally(AfterWatermark.pastEndOfWindow())),
// finally trigger for late
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringInterval)));
这很好用,因为它每 earlyFiringInterval
个时间单位触发一次,直到水印超过 window 的末尾,然后每隔 lateFiringInterval
个时间单位触发一次。
不幸的是,一个会话可能会持续数天,这会导致 window 保持打开很长时间并导致水印被保留。我正在尝试构建一个可以 "cut" window 的触发器,以便:
- 任何会话都不能超过某个
maxSessionLength
时间(在活动时间中)。
- 或者,将会话限制为窗格中的一些
maxSessionLength
事件。 - 这是因为它处于累积模式。 (不理想)
到目前为止,我有:
return AfterEach.inOrder(
Repeatedly
// speculatively trigger at every 'earlyFiringInterval'
.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(earlyFiringInterval)
// terminate trigger when any of the following conditions are met:
// * We have collected either 'maxEventCount' events in the pane
// * Watermark has passed the window
.orFinally(AfterFirst.of(AfterPane.elementCountAtLeast(maxEventCount), AfterWatermark.pastEndOfWindow()))),
Repeatedly
// trigger for late data at every 'lateFiringInterval'
.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringInterval)))
.orFinally(AfterPane.elementCountAtLeast(maxEventCount));
我想知道这是要走的路还是有更好的方法 "limit the window size"。
您可以允许水印前进,同时通过指定 OutputTimeFn
来保持会话的完全保真度:
Window.into(Sessions.withGapDuration(...))
.withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())
就像 CombineFn
确定从分组转换输出的 值 (我们可以将 GroupByKey
视为通过连接组合),一个OutputTimeFn
确定分组转换输出的 时间戳 。
SDK提供了一些常用的选择:
OutputTimeFns.outputAtEndOfWindow()
OutputTimeFns.outputAtEarliestInputTimestamp()
OutputTimeFns.outputAtLatestInputTimestamp()
今天的默认设置是 outputAtEarliestInputTimestamp()
,这在您可以将哪些时间戳应用于下游生成的元素方面提供了最大的灵活性,但不幸的是,以(必要的)成本来阻止水印。
如果您不打算在 window 内的时间戳显式输出,选择 outputAtEndOfWindow()
可以让水印尽快推进。
注意:此功能标记为Experimental
。这意味着它的 API 可能会改变(例如,不是接受任意 OutputTimeFn
实现,它可能被限制为几个固定常量)。这个概念几乎肯定会保留下来,因为我们总是需要为分组转换的输出决定一个时间戳。
如果您出于其他原因仍想缩短会话,请发表评论,我将详细说明其他选项。
顺便说一句,我强烈推荐我们现在提供的简化触发器语法:
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(earlyFiringInterval))
.withLateFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(lateFiringInterval))
我正在研究如何限制会话的最大长度。在做触发的时候。我当前的触发器如下所示:
return AfterEach.inOrder(
// speculatively trigger
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(earlyFiringInterval).orFinally(AfterWatermark.pastEndOfWindow())),
// finally trigger for late
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringInterval)));
这很好用,因为它每 earlyFiringInterval
个时间单位触发一次,直到水印超过 window 的末尾,然后每隔 lateFiringInterval
个时间单位触发一次。
不幸的是,一个会话可能会持续数天,这会导致 window 保持打开很长时间并导致水印被保留。我正在尝试构建一个可以 "cut" window 的触发器,以便:
- 任何会话都不能超过某个
maxSessionLength
时间(在活动时间中)。 - 或者,将会话限制为窗格中的一些
maxSessionLength
事件。 - 这是因为它处于累积模式。 (不理想)
到目前为止,我有:
return AfterEach.inOrder(
Repeatedly
// speculatively trigger at every 'earlyFiringInterval'
.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(earlyFiringInterval)
// terminate trigger when any of the following conditions are met:
// * We have collected either 'maxEventCount' events in the pane
// * Watermark has passed the window
.orFinally(AfterFirst.of(AfterPane.elementCountAtLeast(maxEventCount), AfterWatermark.pastEndOfWindow()))),
Repeatedly
// trigger for late data at every 'lateFiringInterval'
.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringInterval)))
.orFinally(AfterPane.elementCountAtLeast(maxEventCount));
我想知道这是要走的路还是有更好的方法 "limit the window size"。
您可以允许水印前进,同时通过指定 OutputTimeFn
来保持会话的完全保真度:
Window.into(Sessions.withGapDuration(...))
.withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())
就像 CombineFn
确定从分组转换输出的 值 (我们可以将 GroupByKey
视为通过连接组合),一个OutputTimeFn
确定分组转换输出的 时间戳 。
SDK提供了一些常用的选择:
OutputTimeFns.outputAtEndOfWindow()
OutputTimeFns.outputAtEarliestInputTimestamp()
OutputTimeFns.outputAtLatestInputTimestamp()
今天的默认设置是 outputAtEarliestInputTimestamp()
,这在您可以将哪些时间戳应用于下游生成的元素方面提供了最大的灵活性,但不幸的是,以(必要的)成本来阻止水印。
如果您不打算在 window 内的时间戳显式输出,选择 outputAtEndOfWindow()
可以让水印尽快推进。
注意:此功能标记为Experimental
。这意味着它的 API 可能会改变(例如,不是接受任意 OutputTimeFn
实现,它可能被限制为几个固定常量)。这个概念几乎肯定会保留下来,因为我们总是需要为分组转换的输出决定一个时间戳。
如果您出于其他原因仍想缩短会话,请发表评论,我将详细说明其他选项。
顺便说一句,我强烈推荐我们现在提供的简化触发器语法:
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(earlyFiringInterval))
.withLateFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(lateFiringInterval))