运行 Dataflow Batch-Job 步骤完成后运行

Running function once Dataflow Batch-Job step has completed

我有一个具有扇出步骤的数据流作业,每个步骤都将结果写入 GCS 上的不同文件夹。在批处理作业执行期间,每个文件夹写入数百个文件。

我想确定 FileIO 步骤何时完成,以便 运行 java 代码将文件夹的全部内容加载到 BigQuery table。

我知道我可以使用 Cloud Functions 和 PubSub 通知对每个写入的文件执行此操作,但我更喜欢只在完成整个文件夹时执行一次。

谢谢!

有两种方法可以做到这一点:

在管道之后执行它。

运行 您的管道和管道结果,调用 waitUntilFinish(Python 中的 wait_until_finish)以延迟执行,直到您的管道完成后,如下所示:

pipeline.run().waitUntilFinish();

您可以根据 waitUntilFinish 的结果验证管道是否成功完成,然后您可以从那里将文件夹的内容加载到 BigQuery。这种方法的唯一警告是您的代码不是 Dataflow 管道的一部分,因此如果您在该步骤中依赖管道中的元素,它将变得更加困难。

在 FileIO.Write

之后添加变换

FileIO.Write 转换的结果是 WriteFilesResult,它允许您通过调用 getPerDestinationOutputFilenames 获得包含写入文件的所有文件名的 PCollection。从那里,您可以继续使用可以将所有这些文件写入 BigQuery 的转换的管道。这是 Java 中的示例:

WriteFilesResult<DestinationT> result = files.apply(FileIO.write()...)
result.getPerDestinationOutputFilenames().apply(...)

Python 中的等价物似乎叫做 FileResult,但我找不到关于那个的好文档。

@Daniel Oliveira 建议了一种您可以遵循的方法,但在我看来它不是最好的方法。

我不同意他的两个原因:

  1. Narrow scope for handling job failures : Consider a situation where your Dataflow job succeeded but your loading to Big Query job failed. Due to this tight coupling you won't be able to re-run the second job.
  2. Performance of second job will become bottleneck : In a production scenario when your file size will grow, your load job will become bottleneck for other dependent process

正如您已经提到的,您不能在同一作业中直接写入 BQ。我会建议您采用以下方法:

  1. Create another beam job for loading all the file to BQ. You can refer for reading multiple files in beam.
  2. Orchestrate both the code with cloud composer using Dataflow Java Operator or Dataflow Template Operator . Set airflow trigger rule as 'all_sucess' and set job1.setUpstream(job2). Please refer airflow documentation here

希望对您有所帮助