如何在 Dataflow 中计算两个 PCollections 的笛卡尔积?
How to do a cartesian product of two PCollections in Dataflow?
我想计算两个 PCollection 的笛卡尔积。 PCollection 都不能放入内存,所以做边输入是不可行的。
我的目标是:我有两个数据集。一是体积小的元素多。另一个是很少(~10)个非常大的。我想取这两个元素的乘积,然后生成键值对象。
我认为 CoGroupByKey 可能适用于您的情况:
https://cloud.google.com/dataflow/model/group-by-key#join
这就是我为类似用例所做的。虽然我的可能没有受到内存的限制(你试过更大的集群和更大的机器吗?):
PCollection<KV<String, TableRow>> inputClassifiedKeyed = inputClassified
.apply(ParDo.named("Actuals : Keys").of(new ActualsRowToKeyedRow()));
PCollection<KV<String, Iterable<Map<String, String>>>> groupedCategories = p
[...]
.apply(GroupByKey.create());
因此这些集合由相同的密钥键入。
然后我声明了标签:
final TupleTag<Iterable<Map<String, String>>> categoryTag = new TupleTag<>();
final TupleTag<TableRow> actualsTag = new TupleTag<>();
合并它们:
PCollection<KV<String, CoGbkResult>> actualCategoriesCombined =
KeyedPCollectionTuple.of(actualsTag, inputClassifiedKeyed)
.and(categoryTag, groupedCategories)
.apply(CoGroupByKey.create());
在我的例子中,最后一步 - 重新格式化结果(来自连续流中的标记组:
actualCategoriesCombined.apply(ParDo.named("Actuals : Formatting").of(
new DoFn<KV<String, CoGbkResult>, TableRow>() {
@Override
public void processElement(ProcessContext c) throws Exception {
KV<String, CoGbkResult> e = c.element();
Iterable<TableRow> actualTableRows =
e.getValue().getAll(actualsTag);
Iterable<Iterable<Map<String, String>>> categoriesAll =
e.getValue().getAll(categoryTag);
for (TableRow row : actualTableRows) {
// Some of the actuals do not have categories
if (categoriesAll.iterator().hasNext()) {
row.put("advertiser", categoriesAll.iterator().next());
}
c.output(row);
}
}
}))
希望这对您有所帮助。再次 - 不确定内存限制。如果你尝试这种方法,请告诉结果。
创建笛卡尔积使用 Apache Beam extension Join
import org.apache.beam.sdk.extensions.joinlibrary.Join;
...
// Use function Join.fullOuterJoin(final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> rightCollection, final V1 leftNullValue, final V2 rightNullValue)
// and the same key for all rows to create cartesian product as it is shown below:
public static void process(Pipeline pipeline, DataInputOptions options) {
PCollection<KV<Integer, CpuItem>> cpuList = pipeline
.apply("ReadCPUs", TextIO.read().from(options.getInputCpuFile()))
.apply("Creating Cpu Objects", new CpuItem()).apply("Preprocess Cpu",
MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptor.of(CpuItem.class)))
.via((CpuItem e) -> KV.of(0, e)));
PCollection<KV<Integer, GpuItem>> gpuList = pipeline
.apply("ReadGPUs", TextIO.read().from(options.getInputGpuFile()))
.apply("Creating Gpu Objects", new GpuItem()).apply("Preprocess Gpu",
MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptor.of(GpuItem.class)))
.via((GpuItem e) -> KV.of(0, e)));
PCollection<KV<Integer,KV<CpuItem,GpuItem>>> cartesianProduct = Join.fullOuterJoin(cpuList, gpuList, new CpuItem(), new GpuItem());
PCollection<String> finalResultCollection = cartesianProduct.apply("Format results", MapElements.into(TypeDescriptors.strings())
.via((KV<Integer, KV<CpuItem,GpuItem>> e) -> e.getValue().toString()));
finalResultCollection.apply("Output the results",
TextIO.write().to("fps.batchproc\parsed_cpus").withSuffix(".log"));
pipeline.run();
}
在上面的代码中这一行
...
.via((CpuItem e) -> KV.of(0, e)));
...
我为输入数据中可用的所有行创建键等于 0 的映射。结果所有行都匹配。那等于 SQL 不带 WHERE 子句的表达式 JOIN
我想计算两个 PCollection 的笛卡尔积。 PCollection 都不能放入内存,所以做边输入是不可行的。
我的目标是:我有两个数据集。一是体积小的元素多。另一个是很少(~10)个非常大的。我想取这两个元素的乘积,然后生成键值对象。
我认为 CoGroupByKey 可能适用于您的情况:
https://cloud.google.com/dataflow/model/group-by-key#join
这就是我为类似用例所做的。虽然我的可能没有受到内存的限制(你试过更大的集群和更大的机器吗?):
PCollection<KV<String, TableRow>> inputClassifiedKeyed = inputClassified
.apply(ParDo.named("Actuals : Keys").of(new ActualsRowToKeyedRow()));
PCollection<KV<String, Iterable<Map<String, String>>>> groupedCategories = p
[...]
.apply(GroupByKey.create());
因此这些集合由相同的密钥键入。
然后我声明了标签:
final TupleTag<Iterable<Map<String, String>>> categoryTag = new TupleTag<>();
final TupleTag<TableRow> actualsTag = new TupleTag<>();
合并它们:
PCollection<KV<String, CoGbkResult>> actualCategoriesCombined =
KeyedPCollectionTuple.of(actualsTag, inputClassifiedKeyed)
.and(categoryTag, groupedCategories)
.apply(CoGroupByKey.create());
在我的例子中,最后一步 - 重新格式化结果(来自连续流中的标记组:
actualCategoriesCombined.apply(ParDo.named("Actuals : Formatting").of(
new DoFn<KV<String, CoGbkResult>, TableRow>() {
@Override
public void processElement(ProcessContext c) throws Exception {
KV<String, CoGbkResult> e = c.element();
Iterable<TableRow> actualTableRows =
e.getValue().getAll(actualsTag);
Iterable<Iterable<Map<String, String>>> categoriesAll =
e.getValue().getAll(categoryTag);
for (TableRow row : actualTableRows) {
// Some of the actuals do not have categories
if (categoriesAll.iterator().hasNext()) {
row.put("advertiser", categoriesAll.iterator().next());
}
c.output(row);
}
}
}))
希望这对您有所帮助。再次 - 不确定内存限制。如果你尝试这种方法,请告诉结果。
创建笛卡尔积使用 Apache Beam extension Join
import org.apache.beam.sdk.extensions.joinlibrary.Join;
...
// Use function Join.fullOuterJoin(final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> rightCollection, final V1 leftNullValue, final V2 rightNullValue)
// and the same key for all rows to create cartesian product as it is shown below:
public static void process(Pipeline pipeline, DataInputOptions options) {
PCollection<KV<Integer, CpuItem>> cpuList = pipeline
.apply("ReadCPUs", TextIO.read().from(options.getInputCpuFile()))
.apply("Creating Cpu Objects", new CpuItem()).apply("Preprocess Cpu",
MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptor.of(CpuItem.class)))
.via((CpuItem e) -> KV.of(0, e)));
PCollection<KV<Integer, GpuItem>> gpuList = pipeline
.apply("ReadGPUs", TextIO.read().from(options.getInputGpuFile()))
.apply("Creating Gpu Objects", new GpuItem()).apply("Preprocess Gpu",
MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptor.of(GpuItem.class)))
.via((GpuItem e) -> KV.of(0, e)));
PCollection<KV<Integer,KV<CpuItem,GpuItem>>> cartesianProduct = Join.fullOuterJoin(cpuList, gpuList, new CpuItem(), new GpuItem());
PCollection<String> finalResultCollection = cartesianProduct.apply("Format results", MapElements.into(TypeDescriptors.strings())
.via((KV<Integer, KV<CpuItem,GpuItem>> e) -> e.getValue().toString()));
finalResultCollection.apply("Output the results",
TextIO.write().to("fps.batchproc\parsed_cpus").withSuffix(".log"));
pipeline.run();
}
在上面的代码中这一行
...
.via((CpuItem e) -> KV.of(0, e)));
...
我为输入数据中可用的所有行创建键等于 0 的映射。结果所有行都匹配。那等于 SQL 不带 WHERE 子句的表达式 JOIN