运行 相同的 DF 模板并行产生奇怪的结果

Running same DF template in parallel yields strange results

我有一个数据流作业,可以从云 SQL 中提取数据并将其加载到云存储中。我们已将作业配置为接受参数,因此我们可以使用相同的代码来提取多个表。数据流作业被编译为模板。

当我们 create/run 串行模板实例时,我们会得到我们期望的结果。但是,如果我们 create/run 个并行实例,只有少数文件会出现在 Cloud Storage 上。在这两种情况下,我们都可以看到 DF 作业已成功创建和终止。

例如,我们有 11 个实例产生 11 个输出文件。串行我们得到所有 11 个文件,并行我们只得到大约 3 个文件。在并行 运行 期间,所有 11 个实例同时 运行ning

任何人都可以就发生这种情况的原因提供一些建议吗?我假设由 DF 模板创建的临时文件在并行 运行?

期间以某种方式被覆盖

并行 运行ning 的主要动机是更快地提取数据。

编辑

管道非常简单:

        PCollection<String> results =  p
            .apply("Read from Cloud SQL", JdbcIO.<String>read()
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                        .create(dsDriver, dsConnection)
                        .withUsername(options.getCloudSqlUsername())
                        .withPassword(options.getCloudSqlPassword())
                )
                .withQuery(options.getCloudSqlExtractSql())
                .withRowMapper(new JdbcIO.RowMapper<String>() {
                    @Override
                    public String mapRow(ResultSet resultSet) throws Exception {
                        return mapRowToJson(resultSet);
                    }
                })
                .withCoder(StringUtf8Coder.of()));

当我编译我做的模板时

mvn compile exec:java \
 -Dexec.mainClass=com.xxxx.batch_ingestion.LoadCloudSql \
 -Dexec.args="--project=myproject \
    --region=europe-west1 \
    --stagingLocation=gs://bucket/dataflow/staging/ \
    --cloudStorageLocation=gs://bucket/data/ \
    --cloudSqlInstanceId=yyyy \
    --cloudSqlSchema=dev \
    --runner=DataflowRunner \
    --templateLocation=gs://bucket/dataflow/template/BatchIngestion"

当我调用模板时,我还提供了 "tempLocation"。我可以看到正在使用动态临时位置。尽管如此,当 运行 并行时,我没有看到所有输出文件。

谢谢

解决方案

  1. 添加唯一的临时位置
  2. 添加唯一的输出路径和文件名
  3. 在 DF 完成处理后将输出文件移动到 CS 上的最终目标