运行 相同的 DF 模板并行产生奇怪的结果
Running same DF template in parallel yields strange results
我有一个数据流作业,可以从云 SQL 中提取数据并将其加载到云存储中。我们已将作业配置为接受参数,因此我们可以使用相同的代码来提取多个表。数据流作业被编译为模板。
当我们 create/run 串行模板实例时,我们会得到我们期望的结果。但是,如果我们 create/run 个并行实例,只有少数文件会出现在 Cloud Storage 上。在这两种情况下,我们都可以看到 DF 作业已成功创建和终止。
例如,我们有 11 个实例产生 11 个输出文件。串行我们得到所有 11 个文件,并行我们只得到大约 3 个文件。在并行 运行 期间,所有 11 个实例同时 运行ning
任何人都可以就发生这种情况的原因提供一些建议吗?我假设由 DF 模板创建的临时文件在并行 运行?
期间以某种方式被覆盖
并行 运行ning 的主要动机是更快地提取数据。
编辑
管道非常简单:
PCollection<String> results = p
.apply("Read from Cloud SQL", JdbcIO.<String>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create(dsDriver, dsConnection)
.withUsername(options.getCloudSqlUsername())
.withPassword(options.getCloudSqlPassword())
)
.withQuery(options.getCloudSqlExtractSql())
.withRowMapper(new JdbcIO.RowMapper<String>() {
@Override
public String mapRow(ResultSet resultSet) throws Exception {
return mapRowToJson(resultSet);
}
})
.withCoder(StringUtf8Coder.of()));
当我编译我做的模板时
mvn compile exec:java \
-Dexec.mainClass=com.xxxx.batch_ingestion.LoadCloudSql \
-Dexec.args="--project=myproject \
--region=europe-west1 \
--stagingLocation=gs://bucket/dataflow/staging/ \
--cloudStorageLocation=gs://bucket/data/ \
--cloudSqlInstanceId=yyyy \
--cloudSqlSchema=dev \
--runner=DataflowRunner \
--templateLocation=gs://bucket/dataflow/template/BatchIngestion"
当我调用模板时,我还提供了 "tempLocation"。我可以看到正在使用动态临时位置。尽管如此,当 运行 并行时,我没有看到所有输出文件。
谢谢
解决方案
- 添加唯一的临时位置
- 添加唯一的输出路径和文件名
- 在 DF 完成处理后将输出文件移动到 CS 上的最终目标
我有一个数据流作业,可以从云 SQL 中提取数据并将其加载到云存储中。我们已将作业配置为接受参数,因此我们可以使用相同的代码来提取多个表。数据流作业被编译为模板。
当我们 create/run 串行模板实例时,我们会得到我们期望的结果。但是,如果我们 create/run 个并行实例,只有少数文件会出现在 Cloud Storage 上。在这两种情况下,我们都可以看到 DF 作业已成功创建和终止。
例如,我们有 11 个实例产生 11 个输出文件。串行我们得到所有 11 个文件,并行我们只得到大约 3 个文件。在并行 运行 期间,所有 11 个实例同时 运行ning
任何人都可以就发生这种情况的原因提供一些建议吗?我假设由 DF 模板创建的临时文件在并行 运行?
期间以某种方式被覆盖并行 运行ning 的主要动机是更快地提取数据。
编辑
管道非常简单:
PCollection<String> results = p
.apply("Read from Cloud SQL", JdbcIO.<String>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create(dsDriver, dsConnection)
.withUsername(options.getCloudSqlUsername())
.withPassword(options.getCloudSqlPassword())
)
.withQuery(options.getCloudSqlExtractSql())
.withRowMapper(new JdbcIO.RowMapper<String>() {
@Override
public String mapRow(ResultSet resultSet) throws Exception {
return mapRowToJson(resultSet);
}
})
.withCoder(StringUtf8Coder.of()));
当我编译我做的模板时
mvn compile exec:java \
-Dexec.mainClass=com.xxxx.batch_ingestion.LoadCloudSql \
-Dexec.args="--project=myproject \
--region=europe-west1 \
--stagingLocation=gs://bucket/dataflow/staging/ \
--cloudStorageLocation=gs://bucket/data/ \
--cloudSqlInstanceId=yyyy \
--cloudSqlSchema=dev \
--runner=DataflowRunner \
--templateLocation=gs://bucket/dataflow/template/BatchIngestion"
当我调用模板时,我还提供了 "tempLocation"。我可以看到正在使用动态临时位置。尽管如此,当 运行 并行时,我没有看到所有输出文件。
谢谢
解决方案
- 添加唯一的临时位置
- 添加唯一的输出路径和文件名
- 在 DF 完成处理后将输出文件移动到 CS 上的最终目标