SideInputs 破坏数据流性能

SideInputs kill dataflow performance

我正在使用数据流生成大量数据。

我已经测试了两个版本的管道:一个带有侧面输入(不同大小),另一个没有。

当我 运行 管道 没有 侧输入时,我的工作将在大约 7 分钟内完成。当我 运行 我的工作 使用 辅助输入时,我的工作将永远无法完成。

这是我的 DoFn 的样子:

public class MyDoFn extends DoFn<String, String> {

    final PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pCollectionView;
    final List<CSVRecord> stuff;

    private Aggregator<Integer, Integer> dofnCounter =
            createAggregator("DoFn Counter", new Sum.SumIntegerFn());

    public MyDoFn(PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pcv, List<CSVRecord> m) {
        this.pCollectionView = pcv;
        this.stuff = m;
    }

    @Override
    public void processElement(ProcessContext processContext) throws Exception {
        Map<String, Iterable<TreeMap<Long, Float>>> pdata = processContext.sideInput(pCollectionView);

        processContext.output(AnotherClass.generateData(stuff, pdata));

        dofnCounter.addValue(1);
    }
}

这是我的管道的样子:

final Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

PCollection<KV<String, TreeMap<Long, Float>>> data;
data = p.apply(TextIO.Read.from("gs://where_the_files_are/*").named("Reading Data"))
        .apply(ParDo.named("Parsing data").of(new DoFn<String, KV<String, TreeMap<Long, Float>>>() {
            @Override
            public void processElement(ProcessContext processContext) throws Exception {

                // Parse some data

                processContext.output(KV.of(key, value));
            }
        }));

final PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pcv =
        data.apply(GroupByKey.<String, TreeMap<Long, Float>>create())
                .apply(View.<String, Iterable<TreeMap<Long, Float>>>asMap());


DoFn<String, String> dofn = new MyDoFn(pcv, localList);

p.apply(TextIO.Read.from("gs://some_text.txt").named("Sizing"))
        .apply(ParDo.named("Generating the Data").withSideInputs(pvc).of(dofn))
        .apply(TextIO.Write.named("Write_out").to(outputFile));

p.run();

我们花了大约两天的时间尝试各种方法让它发挥作用。我们已将其缩小到包含 侧输入 。如果将 processContext 修改为不使用侧输入,只要包含它,它仍然会非常慢。如果我们不调用 .withSideInput() 它又非常快了。

澄清一下,我们已经在 20mb - 1.5gb 的侧输入大小上对此进行了测试。

非常感谢任何见解。

编辑 包括一些工作 ID:

2016-01-20_14_31_12-1354600113427960103

2016-01-21_08_04_33-1642110636871153093(最新)

如果不小心使用,Dataflow SDK 中的

Side inputs 确实会导致速度变慢。大多数情况下,当每个工作人员必须 re-read 每个主要输入元素 整个侧输入 时,就会发生这种情况

您似乎正在使用通过 asMap 创建的 PCollectionView。在这种情况下,整个侧输入 PCollection 必须适合每个工人的记忆。当需要时,Dataflow SDK 会将此数据复制到每个工作人员上以创建这样的地图。

也就是说,每个工人的地图可能只创建一次或多次,具体取决于多种因素。如果它的大小足够小(通常小于 100 MB),则地图很可能每个工作人员只读取一次并跨元素和跨包重用。然而,如果它的大小不适合我们的缓存(或者其他东西将它从缓存中逐出),整个地图可能在每个 worker 上一次又一次地 re-read。这通常是速度缓慢的 root-cause。

缓存大小可通过 PipelineOptions 控制,如下所示,但由于几个重要的错误修复,这应该只在 1.3.0 及更高版本中使用。

DataflowWorkerHarnessOptions opts = PipelineOptionsFactory.fromArgs(args).withValidation().create().cloneAs(DataflowWorkerHarnessOptions.class);
opts.setWorkerCacheMb(500);
Pipeline p = Pipeline.create(opts);

目前,解决方法是更改​​管道的结构以避免过度 re-reading。我无法在那里为您提供具体建议,因为您没有分享有关您的用例的足够信息。 (如果需要,请 post 单独提问。)


我们正在积极开发我们称之为 分布式侧输入 的相关功能。这将允许查找辅助输入 PCollection 而无需在 worker 上构建整个地图。在这种情况和相关情况下,它应该对性能有很大帮助。我们预计很快就会发布。


我没有看到你引用的两份工作有什么特别可疑的地方。它们被取消得相对较快。

我在通过以下方式创建管道时手动设置缓存大小:

DataflowWorkerHarnessOptions opts = PipelineOptionsFactory.fromArgs(args).withValidation().create().cloneAs(DataflowWorkerHarnessOptions.class);
opts.setWorkerCacheMb(500);
Pipeline p = Pipeline.create(opts);

对于 ~25mb 的边输入,这大大加快了执行时间(作业 ID 2016-01-25_08_42_52-657610385797048159)与以下面的方式创建管道(作业 ID 2016-01-25_07_56_35-14864561652521586982)

  PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

但是,当侧输入增加到 ~400mb 时,缓存大小的增加不会提高性能。理论上,GCE机器类型指示的内存是否都可供worker使用?什么会使工作缓存中的某些内容无效或逐出,从而强制 re-read?

请试用 Dataflow SDK 1.5.0+,它们应该已经解决了您问题的已知性能问题。

Side inputs Dataflow SDK 1.5.0+ 在 运行 批处理流水线时使用新的分布式格式。请注意,如果视图不能完全缓存在内存中,流式传输管道和使用旧版本 Dataflow SDK 的管道仍然需要重新读取侧输入。

对于新格式,我们使用索引来提供基于块的查找和缓存策略。因此,当通过索引查看列表或通过键查看映射时,只会加载包含所述索引或键的块。缓存大小大于工作集大小将有助于提高性能,因为经常访问 indices/keys 不需要重新读取它们所在的块。