在 google 数据流管道末尾将聚合器刷新到 GCS 的任何更简单的方法
Any easier way to flush aggregator to GCS at the end of google dataflow pipeline
我正在使用 Aggregator
记录数据流作业的一些运行时统计信息,我想在管道完成(或每个转换器完成)时将它们刷新到 GCS 或 BQ。
目前,除了使用 Aggregator
之外,我还通过同时利用 tupleTag 创建副输出并刷新副输出 PCollection。
但是我想知道是否有任何其他方便的方法可以直接刷新聚合器本身?
您使用辅助输出 PCollection 的方法应该产生与使用聚合器语义相同的结果。 (例如,当捆绑失败并且必须重试时,聚合器和侧输出都不会包含重复值。)主要区别在于聚合器的部分结果在管道执行期间在监控 UI 中以编程方式提供。
在 Java 内,您可以使用 PipelineResult.getAggregatorValues()。如果您从 [non-blocking]DataflowPipelineRunner
获得 PipelineResult,这将允许您在作业运行时查询聚合器。如果您使用 BlockingDataflowPipelineRunner
、Pipeline.run()
块,并且在作业完成之前您不会获得 PipelineResult。
还有commandline支持:gcloud alpha dataflow metrics tail JOB_ID
我正在使用 Aggregator
记录数据流作业的一些运行时统计信息,我想在管道完成(或每个转换器完成)时将它们刷新到 GCS 或 BQ。
目前,除了使用 Aggregator
之外,我还通过同时利用 tupleTag 创建副输出并刷新副输出 PCollection。
但是我想知道是否有任何其他方便的方法可以直接刷新聚合器本身?
您使用辅助输出 PCollection 的方法应该产生与使用聚合器语义相同的结果。 (例如,当捆绑失败并且必须重试时,聚合器和侧输出都不会包含重复值。)主要区别在于聚合器的部分结果在管道执行期间在监控 UI 中以编程方式提供。
在 Java 内,您可以使用 PipelineResult.getAggregatorValues()。如果您从 [non-blocking]DataflowPipelineRunner
获得 PipelineResult,这将允许您在作业运行时查询聚合器。如果您使用 BlockingDataflowPipelineRunner
、Pipeline.run()
块,并且在作业完成之前您不会获得 PipelineResult。
还有commandline支持:gcloud alpha dataflow metrics tail JOB_ID