延迟数据处理 |阿帕奇梁

Late data handling | Apache Beam

错过 window 和 .withAllowedLateness 时间段的延迟数据将从管道中删除,如记录 here

我对这种行为有几个问题:

  1. 如何处理从管道中丢弃的延迟数据?我们可以添加默认行为吗?说所有迟到的数据都应该记录在某个地方,比如万能桶?
  2. 我们能否有一个指标(Google 数据流 Metrics/Beam)来说明这些消息中有多少由于巨大的延迟而从管道中丢失?
  1. 一般来说,我们将延迟数据定义为元素,当它们到达时,我们更愿意丢弃它们,不想进一步处理。据我所知,添加额外的功能来处理这些消息需要花费大量精力来修改 Java SDK。但是,如果您只想记录它们,这是由 LateDataDroppingDoFnRunner code, which is responsible 完成的,用于从过期的 windows:
  2. 中删除数据
for (WindowedValue<InputT> input : concatElements) {
  BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
  if (canDropDueToExpiredWindow(window)) {
    // The element is too late for this window.
    droppedDueToLateness.inc();
    WindowTracing.debug(
        "{}: Dropping element at {} for key:{}; window:{} "
            + "since too far behind inputWatermark:{}; outputWatermark:{}",
        LateDataFilter.class.getSimpleName(),
        input.getTimestamp(),
        key,
        window,
        timerInternals.currentInputWatermarkTime(),
        timerInternals.currentOutputWatermarkTime());
  }
}

请注意,日志的级别为 DEBUG,因此您可能看不到它。如 here 所述,要覆盖 Dataflow 中的级别,您可以使用 --defaultWorkerLogLevel=DEBUG,或者更好的是,指定特定的 class,例如 --workerLogLevelOverrides={"org.apache.beam.sdk.util.WindowTracing":"DEBUG"}。明智地选择密钥可以帮助公开信息以识别丢失的消息(即数据沿袭)。

  1. 正如在前面的代码片段中所见,droppedDueToLateness 是一个计数器指标,每次我们删除一个元素时它都会递增:droppedDueToLateness.inc();。您可以使用具有资源类型 dataflow_job 和指标 custom.googleapis.com/dataflow/droppedDueToLateness.
  2. 的 Stackdriver 对其进行监控