检查 window 是否由传递它的水印触发
Check if window is triggered by watermark passing it
如果我有这样的window:
.apply(Window
.<String>into(Sessions
.withGapDuration(Duration.standardSeconds(10)))
.triggering(AfterWatermark
.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(1))));
它接收数据:
a -> x (timestamp 0) (received at 20)
a -> y (timestamp 1) (received at 21)
(watermark passes 11) (at 22)
我想 window 有时会触发:
- 20 秒,因为提前开火
- 21 秒,因为提前开火
- 22 秒,因为水印超过了 GapDuration
在我对 windowed 数据应用的 ParDo
函数中,有没有办法区分早期触发和传递 GapDuration
的水印?
根据,无法获取水印。如果我能够做到这一点,我可以检查 max(timestamp) < watermark
。但是由于我无法获取水印,是否有任何其他方法可以确定 window 是由水印传递触发的。
您可以通过调用 ProcesContext#pane()
and use that to determine the Timing
访问 GroupByKey
之后的 DoFn
中元素的 PaneInfo
。这将允许您确定这是 "on time" 触发(由于水印通过了 window 的末尾)还是 speculative/late 触发。
如果我有这样的window:
.apply(Window
.<String>into(Sessions
.withGapDuration(Duration.standardSeconds(10)))
.triggering(AfterWatermark
.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(1))));
它接收数据:
a -> x (timestamp 0) (received at 20)
a -> y (timestamp 1) (received at 21)
(watermark passes 11) (at 22)
我想 window 有时会触发:
- 20 秒,因为提前开火
- 21 秒,因为提前开火
- 22 秒,因为水印超过了 GapDuration
在我对 windowed 数据应用的 ParDo
函数中,有没有办法区分早期触发和传递 GapDuration
的水印?
根据max(timestamp) < watermark
。但是由于我无法获取水印,是否有任何其他方法可以确定 window 是由水印传递触发的。
您可以通过调用 ProcesContext#pane()
and use that to determine the Timing
访问 GroupByKey
之后的 DoFn
中元素的 PaneInfo
。这将允许您确定这是 "on time" 触发(由于水印通过了 window 的末尾)还是 speculative/late 触发。