GCP 数据流 - 处理 JSON 花费的时间太长

GCP dataflow - processing JSON takes too long

我正在尝试处理存储桶中的 json 个文件并将结果写入存储桶:

    DataflowPipelineOptions options = PipelineOptionsFactory.create()
            .as(DataflowPipelineOptions.class);
    options.setRunner(BlockingDataflowPipelineRunner.class);
    options.setProject("the-project");
    options.setStagingLocation("gs://some-bucket/temp/");

    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.Read.from("gs://some-bucket/2016/04/28/*/*.json"))
    .apply(ParDo.named("SanitizeJson").of(new DoFn<String, String>() {
        @Override
        public void processElement(ProcessContext c) {
            try {
                JsonFactory factory = JacksonFactory.getDefaultInstance();
                String json = c.element();
                SomeClass e = factory.fromString(json, SomeClass.class);
                // manipulate the object a bit...
                c.output(factory.toString(e));
            } catch (Exception err) {
                LOG.error("Failed to process element: " + c.element(), err);
            }
        }
    }))
    .apply(TextIO.Write.to("gs://some-bucket/output/"));
    p.run();

我在路径 gs://some-bucket/2016/04/28/ 下有大约 50,000 个文件(在子目录中)。 我的问题是:这需要一个多小时才能完成是否有意义?在亚马逊的 Spark 集群上做类似的事情大约需要 15-20 分钟。我怀疑我可能做某事效率低下。

编辑:

在我的 Spark 作业中,我将所有结果汇总到一个 DataFrame 中,然后才一次写入输出。我注意到我的管道在这里分别写入每个文件,我认为这就是它花费更长时间的原因。有没有办法改变这种行为?

您的作业在 Dataflow 中遇到了几个性能问题,这是因为它针对以较大增量执行工作进行了更优化,而您的作业正在处理大量非常小的文件。因此,作业执行的某些方面最终由每个文件的开销主导。这里有一些细节和建议。

  • 这项工作受限于写入输出而不是读取输入(尽管读取输入也是一个重要部分)。您可以通过在 TextIO.Write 上指定 withNumShards 来显着减少开销,具体取决于输出中需要多少文件。例如。 100 可能是一个合理的值。默认情况下,你会得到一个未指定数量的文件,在这种情况下,给定数据流优化器的当前行为,匹配输入文件的数量:通常这是一个好主意,因为它允许我们不具体化中间数据,但在这个情况下这不是一个好主意,因为输入文件很小,每个文件的开销更重要。
  • 我建议将 maxNumWorkers 设置为一个值,例如12 - 目前第二个工作是自动扩展到过多的工人。这是由于 Dataflow 的自动缩放当前适用于以较大增量处理数据的作业 - 它目前没有考虑每个文件的开销,并且在您的情况下表现不佳。
  • 第二份工作也遇到了一个错误,因为它无法最终确定书面输出。我们正在调查,但是设置 maxNumWorkers 应该也能成功完成。

简而言之:

  • 设置maxNumWorkers=12
  • 设置TextIO.Write.to("...").withNumShards(100)

它应该 运行 好多了。