延迟数据处理 |阿帕奇梁
Late data handling | Apache Beam
错过 window 和 .withAllowedLateness
时间段的延迟数据将从管道中删除,如记录 here
我对这种行为有几个问题:
- 如何处理从管道中丢弃的延迟数据?我们可以添加默认行为吗?说所有迟到的数据都应该记录在某个地方,比如万能桶?
- 我们能否有一个指标(Google 数据流 Metrics/Beam)来说明这些消息中有多少由于巨大的延迟而从管道中丢失?
- 一般来说,我们将延迟数据定义为元素,当它们到达时,我们更愿意丢弃它们,不想进一步处理。据我所知,添加额外的功能来处理这些消息需要花费大量精力来修改 Java SDK。但是,如果您只想记录它们,这是由
LateDataDroppingDoFnRunner
code, which is responsible 完成的,用于从过期的 windows: 中删除数据
for (WindowedValue<InputT> input : concatElements) {
BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
if (canDropDueToExpiredWindow(window)) {
// The element is too late for this window.
droppedDueToLateness.inc();
WindowTracing.debug(
"{}: Dropping element at {} for key:{}; window:{} "
+ "since too far behind inputWatermark:{}; outputWatermark:{}",
LateDataFilter.class.getSimpleName(),
input.getTimestamp(),
key,
window,
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
}
}
请注意,日志的级别为 DEBUG
,因此您可能看不到它。如 here 所述,要覆盖 Dataflow 中的级别,您可以使用 --defaultWorkerLogLevel=DEBUG
,或者更好的是,指定特定的 class,例如 --workerLogLevelOverrides={"org.apache.beam.sdk.util.WindowTracing":"DEBUG"}
。明智地选择密钥可以帮助公开信息以识别丢失的消息(即数据沿袭)。
- 正如在前面的代码片段中所见,
droppedDueToLateness
是一个计数器指标,每次我们删除一个元素时它都会递增:droppedDueToLateness.inc();
。您可以使用具有资源类型 dataflow_job
和指标 custom.googleapis.com/dataflow/droppedDueToLateness
. 的 Stackdriver 对其进行监控
错过 window 和 .withAllowedLateness
时间段的延迟数据将从管道中删除,如记录 here
我对这种行为有几个问题:
- 如何处理从管道中丢弃的延迟数据?我们可以添加默认行为吗?说所有迟到的数据都应该记录在某个地方,比如万能桶?
- 我们能否有一个指标(Google 数据流 Metrics/Beam)来说明这些消息中有多少由于巨大的延迟而从管道中丢失?
- 一般来说,我们将延迟数据定义为元素,当它们到达时,我们更愿意丢弃它们,不想进一步处理。据我所知,添加额外的功能来处理这些消息需要花费大量精力来修改 Java SDK。但是,如果您只想记录它们,这是由
LateDataDroppingDoFnRunner
code, which is responsible 完成的,用于从过期的 windows: 中删除数据
for (WindowedValue<InputT> input : concatElements) {
BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
if (canDropDueToExpiredWindow(window)) {
// The element is too late for this window.
droppedDueToLateness.inc();
WindowTracing.debug(
"{}: Dropping element at {} for key:{}; window:{} "
+ "since too far behind inputWatermark:{}; outputWatermark:{}",
LateDataFilter.class.getSimpleName(),
input.getTimestamp(),
key,
window,
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
}
}
请注意,日志的级别为 DEBUG
,因此您可能看不到它。如 here 所述,要覆盖 Dataflow 中的级别,您可以使用 --defaultWorkerLogLevel=DEBUG
,或者更好的是,指定特定的 class,例如 --workerLogLevelOverrides={"org.apache.beam.sdk.util.WindowTracing":"DEBUG"}
。明智地选择密钥可以帮助公开信息以识别丢失的消息(即数据沿袭)。
- 正如在前面的代码片段中所见,
droppedDueToLateness
是一个计数器指标,每次我们删除一个元素时它都会递增:droppedDueToLateness.inc();
。您可以使用具有资源类型dataflow_job
和指标custom.googleapis.com/dataflow/droppedDueToLateness
. 的 Stackdriver 对其进行监控