SideInputs 破坏数据流性能
SideInputs kill dataflow performance
我正在使用数据流生成大量数据。
我已经测试了两个版本的管道:一个带有侧面输入(不同大小),另一个没有。
当我 运行 管道 没有 侧输入时,我的工作将在大约 7 分钟内完成。当我 运行 我的工作 使用 辅助输入时,我的工作将永远无法完成。
这是我的 DoFn 的样子:
public class MyDoFn extends DoFn<String, String> {
final PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pCollectionView;
final List<CSVRecord> stuff;
private Aggregator<Integer, Integer> dofnCounter =
createAggregator("DoFn Counter", new Sum.SumIntegerFn());
public MyDoFn(PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pcv, List<CSVRecord> m) {
this.pCollectionView = pcv;
this.stuff = m;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
Map<String, Iterable<TreeMap<Long, Float>>> pdata = processContext.sideInput(pCollectionView);
processContext.output(AnotherClass.generateData(stuff, pdata));
dofnCounter.addValue(1);
}
}
这是我的管道的样子:
final Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<KV<String, TreeMap<Long, Float>>> data;
data = p.apply(TextIO.Read.from("gs://where_the_files_are/*").named("Reading Data"))
.apply(ParDo.named("Parsing data").of(new DoFn<String, KV<String, TreeMap<Long, Float>>>() {
@Override
public void processElement(ProcessContext processContext) throws Exception {
// Parse some data
processContext.output(KV.of(key, value));
}
}));
final PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pcv =
data.apply(GroupByKey.<String, TreeMap<Long, Float>>create())
.apply(View.<String, Iterable<TreeMap<Long, Float>>>asMap());
DoFn<String, String> dofn = new MyDoFn(pcv, localList);
p.apply(TextIO.Read.from("gs://some_text.txt").named("Sizing"))
.apply(ParDo.named("Generating the Data").withSideInputs(pvc).of(dofn))
.apply(TextIO.Write.named("Write_out").to(outputFile));
p.run();
我们花了大约两天的时间尝试各种方法让它发挥作用。我们已将其缩小到包含 侧输入 。如果将 processContext 修改为不使用侧输入,只要包含它,它仍然会非常慢。如果我们不调用 .withSideInput() 它又非常快了。
澄清一下,我们已经在 20mb - 1.5gb 的侧输入大小上对此进行了测试。
非常感谢任何见解。
编辑
包括一些工作 ID:
2016-01-20_14_31_12-1354600113427960103
2016-01-21_08_04_33-1642110636871153093(最新)
如果不小心使用,Dataflow SDK 中的 Side inputs 确实会导致速度变慢。大多数情况下,当每个工作人员必须 re-read 每个主要输入元素 整个侧输入 时,就会发生这种情况 。
您似乎正在使用通过 asMap
创建的 PCollectionView
。在这种情况下,整个侧输入 PCollection
必须适合每个工人的记忆。当需要时,Dataflow SDK 会将此数据复制到每个工作人员上以创建这样的地图。
也就是说,每个工人的地图可能只创建一次或多次,具体取决于多种因素。如果它的大小足够小(通常小于 100 MB),则地图很可能每个工作人员只读取一次并跨元素和跨包重用。然而,如果它的大小不适合我们的缓存(或者其他东西将它从缓存中逐出),整个地图可能在每个 worker 上一次又一次地 re-read。这通常是速度缓慢的 root-cause。
缓存大小可通过 PipelineOptions
控制,如下所示,但由于几个重要的错误修复,这应该只在 1.3.0 及更高版本中使用。
DataflowWorkerHarnessOptions opts = PipelineOptionsFactory.fromArgs(args).withValidation().create().cloneAs(DataflowWorkerHarnessOptions.class);
opts.setWorkerCacheMb(500);
Pipeline p = Pipeline.create(opts);
目前,解决方法是更改管道的结构以避免过度 re-reading。我无法在那里为您提供具体建议,因为您没有分享有关您的用例的足够信息。 (如果需要,请 post 单独提问。)
我们正在积极开发我们称之为 分布式侧输入 的相关功能。这将允许查找辅助输入 PCollection
而无需在 worker 上构建整个地图。在这种情况和相关情况下,它应该对性能有很大帮助。我们预计很快就会发布。
我没有看到你引用的两份工作有什么特别可疑的地方。它们被取消得相对较快。
我在通过以下方式创建管道时手动设置缓存大小:
DataflowWorkerHarnessOptions opts = PipelineOptionsFactory.fromArgs(args).withValidation().create().cloneAs(DataflowWorkerHarnessOptions.class);
opts.setWorkerCacheMb(500);
Pipeline p = Pipeline.create(opts);
对于 ~25mb 的边输入,这大大加快了执行时间(作业 ID 2016-01-25_08_42_52-657610385797048159)与以下面的方式创建管道(作业 ID 2016-01-25_07_56_35-14864561652521586982)
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
但是,当侧输入增加到 ~400mb 时,缓存大小的增加不会提高性能。理论上,GCE机器类型指示的内存是否都可供worker使用?什么会使工作缓存中的某些内容无效或逐出,从而强制 re-read?
请试用 Dataflow SDK 1.5.0+,它们应该已经解决了您问题的已知性能问题。
Side inputs Dataflow SDK 1.5.0+ 在 运行 批处理流水线时使用新的分布式格式。请注意,如果视图不能完全缓存在内存中,流式传输管道和使用旧版本 Dataflow SDK 的管道仍然需要重新读取侧输入。
对于新格式,我们使用索引来提供基于块的查找和缓存策略。因此,当通过索引查看列表或通过键查看映射时,只会加载包含所述索引或键的块。缓存大小大于工作集大小将有助于提高性能,因为经常访问 indices/keys 不需要重新读取它们所在的块。
我正在使用数据流生成大量数据。
我已经测试了两个版本的管道:一个带有侧面输入(不同大小),另一个没有。
当我 运行 管道 没有 侧输入时,我的工作将在大约 7 分钟内完成。当我 运行 我的工作 使用 辅助输入时,我的工作将永远无法完成。
这是我的 DoFn 的样子:
public class MyDoFn extends DoFn<String, String> {
final PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pCollectionView;
final List<CSVRecord> stuff;
private Aggregator<Integer, Integer> dofnCounter =
createAggregator("DoFn Counter", new Sum.SumIntegerFn());
public MyDoFn(PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pcv, List<CSVRecord> m) {
this.pCollectionView = pcv;
this.stuff = m;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
Map<String, Iterable<TreeMap<Long, Float>>> pdata = processContext.sideInput(pCollectionView);
processContext.output(AnotherClass.generateData(stuff, pdata));
dofnCounter.addValue(1);
}
}
这是我的管道的样子:
final Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<KV<String, TreeMap<Long, Float>>> data;
data = p.apply(TextIO.Read.from("gs://where_the_files_are/*").named("Reading Data"))
.apply(ParDo.named("Parsing data").of(new DoFn<String, KV<String, TreeMap<Long, Float>>>() {
@Override
public void processElement(ProcessContext processContext) throws Exception {
// Parse some data
processContext.output(KV.of(key, value));
}
}));
final PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pcv =
data.apply(GroupByKey.<String, TreeMap<Long, Float>>create())
.apply(View.<String, Iterable<TreeMap<Long, Float>>>asMap());
DoFn<String, String> dofn = new MyDoFn(pcv, localList);
p.apply(TextIO.Read.from("gs://some_text.txt").named("Sizing"))
.apply(ParDo.named("Generating the Data").withSideInputs(pvc).of(dofn))
.apply(TextIO.Write.named("Write_out").to(outputFile));
p.run();
我们花了大约两天的时间尝试各种方法让它发挥作用。我们已将其缩小到包含 侧输入 。如果将 processContext 修改为不使用侧输入,只要包含它,它仍然会非常慢。如果我们不调用 .withSideInput() 它又非常快了。
澄清一下,我们已经在 20mb - 1.5gb 的侧输入大小上对此进行了测试。
非常感谢任何见解。
编辑 包括一些工作 ID:
2016-01-20_14_31_12-1354600113427960103
2016-01-21_08_04_33-1642110636871153093(最新)
Side inputs 确实会导致速度变慢。大多数情况下,当每个工作人员必须 re-read 每个主要输入元素 整个侧输入 时,就会发生这种情况 。
您似乎正在使用通过 asMap
创建的 PCollectionView
。在这种情况下,整个侧输入 PCollection
必须适合每个工人的记忆。当需要时,Dataflow SDK 会将此数据复制到每个工作人员上以创建这样的地图。
也就是说,每个工人的地图可能只创建一次或多次,具体取决于多种因素。如果它的大小足够小(通常小于 100 MB),则地图很可能每个工作人员只读取一次并跨元素和跨包重用。然而,如果它的大小不适合我们的缓存(或者其他东西将它从缓存中逐出),整个地图可能在每个 worker 上一次又一次地 re-read。这通常是速度缓慢的 root-cause。
缓存大小可通过 PipelineOptions
控制,如下所示,但由于几个重要的错误修复,这应该只在 1.3.0 及更高版本中使用。
DataflowWorkerHarnessOptions opts = PipelineOptionsFactory.fromArgs(args).withValidation().create().cloneAs(DataflowWorkerHarnessOptions.class);
opts.setWorkerCacheMb(500);
Pipeline p = Pipeline.create(opts);
目前,解决方法是更改管道的结构以避免过度 re-reading。我无法在那里为您提供具体建议,因为您没有分享有关您的用例的足够信息。 (如果需要,请 post 单独提问。)
我们正在积极开发我们称之为 分布式侧输入 的相关功能。这将允许查找辅助输入 PCollection
而无需在 worker 上构建整个地图。在这种情况和相关情况下,它应该对性能有很大帮助。我们预计很快就会发布。
我没有看到你引用的两份工作有什么特别可疑的地方。它们被取消得相对较快。
我在通过以下方式创建管道时手动设置缓存大小:
DataflowWorkerHarnessOptions opts = PipelineOptionsFactory.fromArgs(args).withValidation().create().cloneAs(DataflowWorkerHarnessOptions.class);
opts.setWorkerCacheMb(500);
Pipeline p = Pipeline.create(opts);
对于 ~25mb 的边输入,这大大加快了执行时间(作业 ID 2016-01-25_08_42_52-657610385797048159)与以下面的方式创建管道(作业 ID 2016-01-25_07_56_35-14864561652521586982)
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
但是,当侧输入增加到 ~400mb 时,缓存大小的增加不会提高性能。理论上,GCE机器类型指示的内存是否都可供worker使用?什么会使工作缓存中的某些内容无效或逐出,从而强制 re-read?
请试用 Dataflow SDK 1.5.0+,它们应该已经解决了您问题的已知性能问题。
Side inputs Dataflow SDK 1.5.0+ 在 运行 批处理流水线时使用新的分布式格式。请注意,如果视图不能完全缓存在内存中,流式传输管道和使用旧版本 Dataflow SDK 的管道仍然需要重新读取侧输入。
对于新格式,我们使用索引来提供基于块的查找和缓存策略。因此,当通过索引查看列表或通过键查看映射时,只会加载包含所述索引或键的块。缓存大小大于工作集大小将有助于提高性能,因为经常访问 indices/keys 不需要重新读取它们所在的块。