指定正确触发 10 分钟 window + 5 分钟延迟缓冲区仅产生 1 个结果
Specifying correct Triggering for 10 minutes window + 5 minutes lateness buffer producing only 1 result
我正在创建一个管道,它摄取无限数据源并进行聚合计算。根据事件时间和迟到事件的 5 分钟缓冲区,计算在 10 分钟 window 内完成。
我希望在 10 分钟 window 和 5 分钟缓冲区通过后仅发出一次聚合结果。
我不知道如何让 window 只发出一次结果。我相信正确的方法是使用 AfterWatermark
触发器,但如果我使用 withLateFirings()
,结果将在 window 传递后和延迟触发持续时间传递后发出两次。如果不使用延迟触发,延迟事件将不会被计算在内,这不符合我的要求。
public class WindowFactory {
private static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
public static Window<Message> getMessageFixedWindow(Duration duration) {
return Window.<Message>into(FixedWindows.of(duration))
.triggering(
AfterWatermark
.pastEndOfWindow()
.withLateFirings(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(FIVE_MINUTES)))
.discardingFiredPanes()
.withAllowedLateness(FIVE_MINUTES);
}
}
请建议我在 10 分钟 windows 和 5 分钟缓冲后只产生 1 个结果的好方法。
您现在设置的内容将触发两次,一次是在水印已经超过 window 的末尾时,一次是在延迟数据缓冲区 window 关闭时。
无法仅使用触发器禁用 window 末尾的第一次触发。但是,您可以检测到您正在看到第一次触发并忽略它。通过检查 Pane.IsLast().
@ProcessElement
public void processElement(ProcessContext c) {
if (!c.pane().isLast()) {
return;
}
}
对于没有延迟数据的情况,您不能在 window 结束时触发系统。系统不知道延迟数据是否会在此时到达。不过,我不认为你是专门问这个的,我只是想提一下。
尝试 post 中的解决方案:
// We first specify to never emit any panes
.triggering(Never.ever())
// We then specify to fire always when closing the window. This will emit a
// single final pane at the end of allowedLateness
.withAllowedLateness(FIVE_MINUTES, Window.ClosingBehavior.FIRE_ALWAYS)
.discardingFiredPanes())
如代码注释中所述,您首先使用 Never.ever()
触发器,这样 window 将永远不会触发,因此当水印经过 [= 结束时也不会触发21=]。使用将覆盖触发器的关闭行为 Window.ClosingBehavior.FIRE_ALWAYS
,确保在允许的迟到之后 window 关闭时始终触发窗格。
这将导致在 10 分钟 window + 5 分钟的延迟缓冲区后触发 1 个窗格。
我正在创建一个管道,它摄取无限数据源并进行聚合计算。根据事件时间和迟到事件的 5 分钟缓冲区,计算在 10 分钟 window 内完成。 我希望在 10 分钟 window 和 5 分钟缓冲区通过后仅发出一次聚合结果。
我不知道如何让 window 只发出一次结果。我相信正确的方法是使用 AfterWatermark
触发器,但如果我使用 withLateFirings()
,结果将在 window 传递后和延迟触发持续时间传递后发出两次。如果不使用延迟触发,延迟事件将不会被计算在内,这不符合我的要求。
public class WindowFactory {
private static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
public static Window<Message> getMessageFixedWindow(Duration duration) {
return Window.<Message>into(FixedWindows.of(duration))
.triggering(
AfterWatermark
.pastEndOfWindow()
.withLateFirings(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(FIVE_MINUTES)))
.discardingFiredPanes()
.withAllowedLateness(FIVE_MINUTES);
}
}
请建议我在 10 分钟 windows 和 5 分钟缓冲后只产生 1 个结果的好方法。
您现在设置的内容将触发两次,一次是在水印已经超过 window 的末尾时,一次是在延迟数据缓冲区 window 关闭时。
无法仅使用触发器禁用 window 末尾的第一次触发。但是,您可以检测到您正在看到第一次触发并忽略它。通过检查 Pane.IsLast().
@ProcessElement
public void processElement(ProcessContext c) {
if (!c.pane().isLast()) {
return;
}
}
对于没有延迟数据的情况,您不能在 window 结束时触发系统。系统不知道延迟数据是否会在此时到达。不过,我不认为你是专门问这个的,我只是想提一下。
尝试
// We first specify to never emit any panes
.triggering(Never.ever())
// We then specify to fire always when closing the window. This will emit a
// single final pane at the end of allowedLateness
.withAllowedLateness(FIVE_MINUTES, Window.ClosingBehavior.FIRE_ALWAYS)
.discardingFiredPanes())
如代码注释中所述,您首先使用 Never.ever()
触发器,这样 window 将永远不会触发,因此当水印经过 [= 结束时也不会触发21=]。使用将覆盖触发器的关闭行为 Window.ClosingBehavior.FIRE_ALWAYS
,确保在允许的迟到之后 window 关闭时始终触发窗格。
这将导致在 10 分钟 window + 5 分钟的延迟缓冲区后触发 1 个窗格。