指定正确触发 10 分钟 window + 5 分钟延迟缓冲区仅产生 1 个结果

Specifying correct Triggering for 10 minutes window + 5 minutes lateness buffer producing only 1 result

我正在创建一个管道,它摄取无限数据源并进行聚合计算。根据事件时间和迟到事件的 5 分钟缓冲区,计算在 10 分钟 window 内完成。 我希望在 10 分钟 window 和 5 分钟缓冲区通过后仅发出一次聚合结果。

我不知道如何让 window 只发出一次结果。我相信正确的方法是使用 AfterWatermark 触发器,但如果我使用 withLateFirings(),结果将在 window 传递后和延迟触发持续时间传递后发出两次。如果不使用延迟触发,延迟事件将不会被计算在内,这不符合我的要求。

public class WindowFactory {
  private static final Duration FIVE_MINUTES = Duration.standardMinutes(5);

  public static Window<Message> getMessageFixedWindow(Duration duration) {
    return Window.<Message>into(FixedWindows.of(duration))
                 .triggering(
                      AfterWatermark
                        .pastEndOfWindow()
                        .withLateFirings(
                             AfterProcessingTime
                                .pastFirstElementInPane()
                                .plusDelayOf(FIVE_MINUTES)))
                 .discardingFiredPanes()
                 .withAllowedLateness(FIVE_MINUTES);
  }
}

请建议我在 10 分钟 windows 和 5 分钟缓冲后只产生 1 个结果的好方法。

您现在设置的内容将触发两次,一次是在水印已经超过 window 的末尾时,一次是在延迟数据缓冲区 window 关闭时。

无法仅使用触发器禁用 window 末尾的第一次触发。但是,您可以检测到您正在看到第一次触发并忽略它。通过检查 Pane.IsLast().

@ProcessElement
public void processElement(ProcessContext c) {
  if (!c.pane().isLast()) { 
    return;
  }
}

对于没有延迟数据的情况,您不能在 window 结束时触发系统。系统不知道延迟数据是否会在此时到达。不过,我不认为你是专门问这个的,我只是想提一下。

尝试 post 中的解决方案:

 // We first specify to never emit any panes
 .triggering(Never.ever())

 // We then specify to fire always when closing the window. This will emit a
 // single final pane at the end of allowedLateness
 .withAllowedLateness(FIVE_MINUTES, Window.ClosingBehavior.FIRE_ALWAYS)
 .discardingFiredPanes())

如代码注释中所述,您首先使用 Never.ever() 触发器,这样 window 将永远不会触发,因此当水印经过 [= 结束时也不会触发21=]。使用将覆盖触发器的关闭行为 Window.ClosingBehavior.FIRE_ALWAYS,确保在允许的迟到之后 window 关闭时始终触发窗格。

这将导致在 10 分钟 window + 5 分钟的延迟缓冲区后触发 1 个窗格。