将 PCollection<T> 组合成 PCollection<Iterable<T>> 的简单方法
Simple approach to combine PCollection<T> into PCollection<Iterable<T>>
我正在使用 Google Cloud Dataflow,并且有一个 ParDo 函数需要访问 PCollection 中的所有元素。为此,我想将 PCollection 转换为包含所有元素的单个 Iterable 的 PCollection>。我想知道是否有 cleaner/simpler/faster 解决方案可以解决我的问题。
第一种方法是创建一个虚拟键,执行 GroupByKey,然后获取值。
PCollection<MyType> myData;
// AddDummyKey() outputs KV.of(1, context.element()) for everything
PCollection<KV<Integer, MyType>> myDataKeyed = myData.apply(ParDo.of(new AddDummyKey()));
// Group by dummy key
PCollection<KV<Integer, Iterable<MyType>>> myDataGrouped = myDataKeyed.apply(GroupByKey.create());
// Extract values
PCollection<Iterable<MyType>> myDataIterable = myDataGrouped.apply(Values.<Iterable<MyType>>create()
第二种方法遵循此处的建议:How do I make View's asList() sortable in Google Dataflow SDK? 但没有排序。我创建了一个 View.asList(),创建了一个虚拟 PCollection,然后在虚拟 PCollection 上应用了一个 ParDo 函数,并将视图作为辅助输入并简单地返回了视图。
PCollection<MyType> myData;
// Create view of the PCollection as a list
PCollectionView<List<MyType>> myDataView = myData.apply(View.asList());
// Create dummy PCollection
PCollection<Integer> dummy = pipeline.apply(Create.<Integer>of(1));
// Apply dummy ParDo that returns the view
PCollection<List<MyType>> myDataList = dummy.apply(
ParDo.withSideInputs(myDataView).of(new DoFn<Integer, List<MyType>>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.sideInput(myDataView));
}
}));
似乎有针对此任务的预定义组合函数,但我找不到。感谢您的帮助!
如果您知道自己需要全部,那么您的两种方法都是合理的。两者都已在 Dataflow SDK 中使用,后来成为 Apache Beam SDK。
- 边输入然后输出整个东西:事实上,这就是
DataflowAssert
的工作方式。在 Beam 中,不同的后端运行器可能会以不同的方式实现侧输入,您应该更喜欢 View.asIterable()
,因为它的假设更少,并且可能允许更多的流式传输非常大的侧输入。
- 按单个键分组,然后放下键:这就是 Beam 的继任者
PAssert
的工作方式。它完成同样的事情,需要更多地关注空集合,但更多的 Beam runners 比侧输入支持更好 GroupByKey
支持(尤其是当它们是新的并且仍在开发中时)。
所以 View.asIterable()
基本上是为了满足您的要求。还有一些请求 GroupGlobally
进行第二个版本的转换;这可能会在某个时候发生。
截至目前,更有效的方法是将 Combine 与 AccumulatorFn 一起使用,例如:
我正在使用 Google Cloud Dataflow,并且有一个 ParDo 函数需要访问 PCollection 中的所有元素。为此,我想将 PCollection
第一种方法是创建一个虚拟键,执行 GroupByKey,然后获取值。
PCollection<MyType> myData;
// AddDummyKey() outputs KV.of(1, context.element()) for everything
PCollection<KV<Integer, MyType>> myDataKeyed = myData.apply(ParDo.of(new AddDummyKey()));
// Group by dummy key
PCollection<KV<Integer, Iterable<MyType>>> myDataGrouped = myDataKeyed.apply(GroupByKey.create());
// Extract values
PCollection<Iterable<MyType>> myDataIterable = myDataGrouped.apply(Values.<Iterable<MyType>>create()
第二种方法遵循此处的建议:How do I make View's asList() sortable in Google Dataflow SDK? 但没有排序。我创建了一个 View.asList(),创建了一个虚拟 PCollection,然后在虚拟 PCollection 上应用了一个 ParDo 函数,并将视图作为辅助输入并简单地返回了视图。
PCollection<MyType> myData;
// Create view of the PCollection as a list
PCollectionView<List<MyType>> myDataView = myData.apply(View.asList());
// Create dummy PCollection
PCollection<Integer> dummy = pipeline.apply(Create.<Integer>of(1));
// Apply dummy ParDo that returns the view
PCollection<List<MyType>> myDataList = dummy.apply(
ParDo.withSideInputs(myDataView).of(new DoFn<Integer, List<MyType>>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.sideInput(myDataView));
}
}));
似乎有针对此任务的预定义组合函数,但我找不到。感谢您的帮助!
如果您知道自己需要全部,那么您的两种方法都是合理的。两者都已在 Dataflow SDK 中使用,后来成为 Apache Beam SDK。
- 边输入然后输出整个东西:事实上,这就是
DataflowAssert
的工作方式。在 Beam 中,不同的后端运行器可能会以不同的方式实现侧输入,您应该更喜欢View.asIterable()
,因为它的假设更少,并且可能允许更多的流式传输非常大的侧输入。 - 按单个键分组,然后放下键:这就是 Beam 的继任者
PAssert
的工作方式。它完成同样的事情,需要更多地关注空集合,但更多的 Beam runners 比侧输入支持更好GroupByKey
支持(尤其是当它们是新的并且仍在开发中时)。
所以 View.asIterable()
基本上是为了满足您的要求。还有一些请求 GroupGlobally
进行第二个版本的转换;这可能会在某个时候发生。
截至目前,更有效的方法是将 Combine 与 AccumulatorFn 一起使用,例如: