在 Google DataFlow 管道中创建并行 For 循环的正确方法
Right way to create parallel For loop in Google DataFlow pipeline
我有一个简单的 DataFlow java 作业,它从 .csv 文件中读取几行。每行包含一个数字单元格,表示必须在该行上执行特定功能的步骤数。
我不想在函数内使用传统的 For 循环来执行该操作,以防这些数字变得非常大。使用并行友好的 DataFlow 方法来执行此操作的正确方法是什么?
这是当前的 Java 代码:
public class SimpleJob{
static class MyDoFn extends DoFn<String, Integer> {
public void processElement(ProcessContext c) {
String name = c.element().split("\,")[0];
int val = Integer.valueOf(c.element().split("\,")[1]);
for (int i = 0; i < val; i++) // <- what's the preferred way to do this in DF?
System.out.println("Processing some function: " + name); // <- do something
c.output(val);
}
}
public static void main() {
DataflowPipelineOptions options = PipelineOptionsFactory
.as(DataflowPipelineOptions.class);
options.setProject(DEF.ID_PROJ);
options.setStagingLocation(DEF.ID_STG_LOC);
options.setRunner(DirectPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(TextIO.Read.from("Source.csv"))
.apply(ParDo.of(new MyDoFn()));
pipeline.run();
}
}
这就是 "source.csv" 的样子(所以每个数字代表我想要 运行 行上的并行函数的次数):
乔,3
玛丽,4
彼得,2
奇怪的是,这是 Splittable DoFn 的激励用例之一! API 目前正在大力开发中。
但是,在 API 可用之前,您基本上可以模仿它为您完成的大部分操作:
class ElementAndRepeats { String element; int numRepeats; }
PCollection<String> lines = p.apply(TextIO.Read....)
PCollection<ElementAndRepeats> elementAndNumRepeats = lines.apply(
ParDo.of(...parse number of repetitions from the line...));
PCollection<ElementAndRepeats> elementAndNumSubRepeats = elementAndNumRepeats
.apply(ParDo.of(
...split large numbers of repetitions into smaller numbers...))
.apply(...fusion break...);
elementAndNumSubRepeats.apply(ParDo.of(...execute the repetitions...))
其中:
- "split large numbers of repetitions" 是一个 DoFn,例如,将
ElementAndRepeats{"foo", 34}
拆分为 {ElementAndRepeats{"foo", 10}, ElementAndRepeats{"foo", 10}, ElementAndRepeats{"foo", 10}, ElementAndRepeats{"foo", 4}}
- 融合中断 - 请参阅 here,以防止多个 ParDo 融合在一起,破坏并行化
我有一个简单的 DataFlow java 作业,它从 .csv 文件中读取几行。每行包含一个数字单元格,表示必须在该行上执行特定功能的步骤数。
我不想在函数内使用传统的 For 循环来执行该操作,以防这些数字变得非常大。使用并行友好的 DataFlow 方法来执行此操作的正确方法是什么?
这是当前的 Java 代码:
public class SimpleJob{
static class MyDoFn extends DoFn<String, Integer> {
public void processElement(ProcessContext c) {
String name = c.element().split("\,")[0];
int val = Integer.valueOf(c.element().split("\,")[1]);
for (int i = 0; i < val; i++) // <- what's the preferred way to do this in DF?
System.out.println("Processing some function: " + name); // <- do something
c.output(val);
}
}
public static void main() {
DataflowPipelineOptions options = PipelineOptionsFactory
.as(DataflowPipelineOptions.class);
options.setProject(DEF.ID_PROJ);
options.setStagingLocation(DEF.ID_STG_LOC);
options.setRunner(DirectPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(TextIO.Read.from("Source.csv"))
.apply(ParDo.of(new MyDoFn()));
pipeline.run();
}
}
这就是 "source.csv" 的样子(所以每个数字代表我想要 运行 行上的并行函数的次数):
乔,3
玛丽,4
彼得,2
奇怪的是,这是 Splittable DoFn 的激励用例之一! API 目前正在大力开发中。
但是,在 API 可用之前,您基本上可以模仿它为您完成的大部分操作:
class ElementAndRepeats { String element; int numRepeats; }
PCollection<String> lines = p.apply(TextIO.Read....)
PCollection<ElementAndRepeats> elementAndNumRepeats = lines.apply(
ParDo.of(...parse number of repetitions from the line...));
PCollection<ElementAndRepeats> elementAndNumSubRepeats = elementAndNumRepeats
.apply(ParDo.of(
...split large numbers of repetitions into smaller numbers...))
.apply(...fusion break...);
elementAndNumSubRepeats.apply(ParDo.of(...execute the repetitions...))
其中:
- "split large numbers of repetitions" 是一个 DoFn,例如,将
ElementAndRepeats{"foo", 34}
拆分为{ElementAndRepeats{"foo", 10}, ElementAndRepeats{"foo", 10}, ElementAndRepeats{"foo", 10}, ElementAndRepeats{"foo", 4}}
- 融合中断 - 请参阅 here,以防止多个 ParDo 融合在一起,破坏并行化