GCP 数据流 - 处理 JSON 花费的时间太长
GCP dataflow - processing JSON takes too long
我正在尝试处理存储桶中的 json 个文件并将结果写入存储桶:
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("the-project");
options.setStagingLocation("gs://some-bucket/temp/");
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from("gs://some-bucket/2016/04/28/*/*.json"))
.apply(ParDo.named("SanitizeJson").of(new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
try {
JsonFactory factory = JacksonFactory.getDefaultInstance();
String json = c.element();
SomeClass e = factory.fromString(json, SomeClass.class);
// manipulate the object a bit...
c.output(factory.toString(e));
} catch (Exception err) {
LOG.error("Failed to process element: " + c.element(), err);
}
}
}))
.apply(TextIO.Write.to("gs://some-bucket/output/"));
p.run();
我在路径 gs://some-bucket/2016/04/28/ 下有大约 50,000 个文件(在子目录中)。
我的问题是:这需要一个多小时才能完成是否有意义?在亚马逊的 Spark 集群上做类似的事情大约需要 15-20 分钟。我怀疑我可能做某事效率低下。
编辑:
在我的 Spark 作业中,我将所有结果汇总到一个 DataFrame 中,然后才一次写入输出。我注意到我的管道在这里分别写入每个文件,我认为这就是它花费更长时间的原因。有没有办法改变这种行为?
您的作业在 Dataflow 中遇到了几个性能问题,这是因为它针对以较大增量执行工作进行了更优化,而您的作业正在处理大量非常小的文件。因此,作业执行的某些方面最终由每个文件的开销主导。这里有一些细节和建议。
- 这项工作受限于写入输出而不是读取输入(尽管读取输入也是一个重要部分)。您可以通过在
TextIO.Write
上指定 withNumShards
来显着减少开销,具体取决于输出中需要多少文件。例如。 100
可能是一个合理的值。默认情况下,你会得到一个未指定数量的文件,在这种情况下,给定数据流优化器的当前行为,匹配输入文件的数量:通常这是一个好主意,因为它允许我们不具体化中间数据,但在这个情况下这不是一个好主意,因为输入文件很小,每个文件的开销更重要。
- 我建议将
maxNumWorkers
设置为一个值,例如12 - 目前第二个工作是自动扩展到过多的工人。这是由于 Dataflow 的自动缩放当前适用于以较大增量处理数据的作业 - 它目前没有考虑每个文件的开销,并且在您的情况下表现不佳。
- 第二份工作也遇到了一个错误,因为它无法最终确定书面输出。我们正在调查,但是设置
maxNumWorkers
应该也能成功完成。
简而言之:
- 设置
maxNumWorkers=12
- 设置
TextIO.Write.to("...").withNumShards(100)
它应该 运行 好多了。
我正在尝试处理存储桶中的 json 个文件并将结果写入存储桶:
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("the-project");
options.setStagingLocation("gs://some-bucket/temp/");
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from("gs://some-bucket/2016/04/28/*/*.json"))
.apply(ParDo.named("SanitizeJson").of(new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
try {
JsonFactory factory = JacksonFactory.getDefaultInstance();
String json = c.element();
SomeClass e = factory.fromString(json, SomeClass.class);
// manipulate the object a bit...
c.output(factory.toString(e));
} catch (Exception err) {
LOG.error("Failed to process element: " + c.element(), err);
}
}
}))
.apply(TextIO.Write.to("gs://some-bucket/output/"));
p.run();
我在路径 gs://some-bucket/2016/04/28/ 下有大约 50,000 个文件(在子目录中)。 我的问题是:这需要一个多小时才能完成是否有意义?在亚马逊的 Spark 集群上做类似的事情大约需要 15-20 分钟。我怀疑我可能做某事效率低下。
编辑:
在我的 Spark 作业中,我将所有结果汇总到一个 DataFrame 中,然后才一次写入输出。我注意到我的管道在这里分别写入每个文件,我认为这就是它花费更长时间的原因。有没有办法改变这种行为?
您的作业在 Dataflow 中遇到了几个性能问题,这是因为它针对以较大增量执行工作进行了更优化,而您的作业正在处理大量非常小的文件。因此,作业执行的某些方面最终由每个文件的开销主导。这里有一些细节和建议。
- 这项工作受限于写入输出而不是读取输入(尽管读取输入也是一个重要部分)。您可以通过在
TextIO.Write
上指定withNumShards
来显着减少开销,具体取决于输出中需要多少文件。例如。100
可能是一个合理的值。默认情况下,你会得到一个未指定数量的文件,在这种情况下,给定数据流优化器的当前行为,匹配输入文件的数量:通常这是一个好主意,因为它允许我们不具体化中间数据,但在这个情况下这不是一个好主意,因为输入文件很小,每个文件的开销更重要。 - 我建议将
maxNumWorkers
设置为一个值,例如12 - 目前第二个工作是自动扩展到过多的工人。这是由于 Dataflow 的自动缩放当前适用于以较大增量处理数据的作业 - 它目前没有考虑每个文件的开销,并且在您的情况下表现不佳。 - 第二份工作也遇到了一个错误,因为它无法最终确定书面输出。我们正在调查,但是设置
maxNumWorkers
应该也能成功完成。
简而言之:
- 设置
maxNumWorkers=12
- 设置
TextIO.Write.to("...").withNumShards(100)
它应该 运行 好多了。