在 Google DataFlow 管道中创建并行 For 循环的正确方法

Right way to create parallel For loop in Google DataFlow pipeline

我有一个简单的 DataFlow java 作业,它从 .csv 文件中读取几行。每行包含一个数字单元格,表示必须在该行上执行特定功能的步骤数。

我不想在函数内使用传统的 For 循环来执行该操作,以防这些数字变得非常大。使用并行友好的 DataFlow 方法来执行此操作的正确方法是什么?

这是当前的 Java 代码:

public class SimpleJob{

    static class MyDoFn extends DoFn<String, Integer> {

        public void processElement(ProcessContext c) {
            String name = c.element().split("\,")[0];
            int val = Integer.valueOf(c.element().split("\,")[1]);
            for (int i = 0; i < val; i++) // <- what's the preferred way to do this in DF?
                System.out.println("Processing some function: " + name); // <- do something
            c.output(val);
        }

    }

    public static void main() {

        DataflowPipelineOptions options = PipelineOptionsFactory
                .as(DataflowPipelineOptions.class);
        options.setProject(DEF.ID_PROJ);
        options.setStagingLocation(DEF.ID_STG_LOC);
        options.setRunner(DirectPipelineRunner.class);

        Pipeline pipeline = Pipeline.create(options);

        pipeline.apply(TextIO.Read.from("Source.csv"))
                .apply(ParDo.of(new MyDoFn()));

        pipeline.run();
    }
}

这就是 "source.csv" 的样子(所以每个数字代表我想要 运行 行上的并行函数的次数):

乔,3
玛丽,4
彼得,2

奇怪的是,这是 Splittable DoFn 的激励用例之一! API 目前正在大力开发中。

但是,在 API 可用之前,您基本上可以模仿它为您完成的大部分操作:

class ElementAndRepeats { String element; int numRepeats; }
PCollection<String> lines = p.apply(TextIO.Read....)
PCollection<ElementAndRepeats> elementAndNumRepeats = lines.apply(
    ParDo.of(...parse number of repetitions from the line...));
PCollection<ElementAndRepeats> elementAndNumSubRepeats = elementAndNumRepeats
    .apply(ParDo.of(
        ...split large numbers of repetitions into smaller numbers...))
    .apply(...fusion break...);
elementAndNumSubRepeats.apply(ParDo.of(...execute the repetitions...))

其中:

  • "split large numbers of repetitions" 是一个 DoFn,例如,将 ElementAndRepeats{"foo", 34} 拆分为 {ElementAndRepeats{"foo", 10}, ElementAndRepeats{"foo", 10}, ElementAndRepeats{"foo", 10}, ElementAndRepeats{"foo", 4}}
  • 融合中断 - 请参阅 here,以防止多个 ParDo 融合在一起,破坏并行化