在 google 数据流管道末尾将聚合器刷新到 GCS 的任何更简单的方法

Any easier way to flush aggregator to GCS at the end of google dataflow pipeline

我正在使用 Aggregator 记录数据流作业的一些运行时统计信息,我想在管道完成(或每个转换器完成)时将它们刷新到 GCS 或 BQ。

目前,除了使用 Aggregator 之外,我还通过同时利用 tupleTag 创建副输出并刷新副输出 PCollection。 但是我想知道是否有任何其他方便的方法可以直接刷新聚合器本身?

您使用辅助输出 PCollection 的方法应该产生与使用聚合器语义相同的结果。 (例如,当捆绑失败并且必须重试时,聚合器和侧输出都不会包含重复值。)主要区别在于聚合器的部分结果在管道执行期间在监控 UI 中以编程方式提供。

在 Java 内,您可以使用 PipelineResult.getAggregatorValues()。如果您从 [non-blocking]DataflowPipelineRunner 获得 PipelineResult,这将允许您在作业运行时查询聚合器。如果您使用 BlockingDataflowPipelineRunnerPipeline.run() 块,并且在作业完成之前您不会获得 PipelineResult。

还有commandline支持:gcloud alpha dataflow metrics tail JOB_ID