通用云数据流模式——有更好的方法吗?
Common Cloud Dataflow pattern - is there a better way?
我们发现自己经常在 Dataflow 中使用以下模式:
- 从 BigQuery TableRow
执行键提取 ParDo
- 对 1
的结果执行 GroupByKey
- 对 2
的结果执行扁平化 ParDo
Dataflow 中是否有一个操作可以一次性实现这一点(至少从 API 的角度来看)?
我看过 Combine 操作,但它似乎更适合在计算值时使用,例如sums/averages等
你的问题没有太多细节,我只能给出一般性的建议。
您可以创建一个 PTransform
将上述模式组合成一个复合转换。这允许您将经常使用的操作放在一个单一的可重用组件中。
下面的代码应该让您明白我的意思:
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
class ExtractKeyFn extends DoFn<TableRow, KV<String, TableRow>> {
@Override
public void processElement(ProcessContext c) throws Exception {
TableRow row = c.element();
Object key = row.get("key");
if (key != null) {
c.output(KV.of(key.toString(), row));
}
}
}
class CompositeTransform extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
public CompositeTransform(String name) {
super(name);
}
public static CompositeTransform named(String name) {
return new CompositeTransform(name);
}
@Override
public PCollection<TableRow> apply(PCollection<TableRow> input) {
return input.apply(ParDo.named("parse").of(new ExtractKeyFn()))
.apply(GroupByKey.create())
// potentially more transformations
.apply(Values.create()) // get only the values ( because we have a kv )
.apply(Flatten.iterables()); // flatten them out
}
}
public class Main {
public static void run(PipelineOptions options) {
Pipeline p = Pipeline.create(options);
// read input
p.apply(BigQueryIO.Read.from("inputTable...").named("inputFromBigQuery"))
// apply fancy transform
.apply(CompositeTransform.named("FancyKeyGroupAndFlatten"))
// write output
.apply(BigQueryIO.Write.to("outputTable...").named("outputToBigQuery"));
p.run();
}
}
我们发现自己经常在 Dataflow 中使用以下模式:
- 从 BigQuery TableRow 执行键提取
- 对 1 的结果执行
- 对 2 的结果执行扁平化
ParDo
GroupByKey
ParDo
Dataflow 中是否有一个操作可以一次性实现这一点(至少从 API 的角度来看)?
我看过 Combine 操作,但它似乎更适合在计算值时使用,例如sums/averages等
你的问题没有太多细节,我只能给出一般性的建议。
您可以创建一个 PTransform
将上述模式组合成一个复合转换。这允许您将经常使用的操作放在一个单一的可重用组件中。
下面的代码应该让您明白我的意思:
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
class ExtractKeyFn extends DoFn<TableRow, KV<String, TableRow>> {
@Override
public void processElement(ProcessContext c) throws Exception {
TableRow row = c.element();
Object key = row.get("key");
if (key != null) {
c.output(KV.of(key.toString(), row));
}
}
}
class CompositeTransform extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
public CompositeTransform(String name) {
super(name);
}
public static CompositeTransform named(String name) {
return new CompositeTransform(name);
}
@Override
public PCollection<TableRow> apply(PCollection<TableRow> input) {
return input.apply(ParDo.named("parse").of(new ExtractKeyFn()))
.apply(GroupByKey.create())
// potentially more transformations
.apply(Values.create()) // get only the values ( because we have a kv )
.apply(Flatten.iterables()); // flatten them out
}
}
public class Main {
public static void run(PipelineOptions options) {
Pipeline p = Pipeline.create(options);
// read input
p.apply(BigQueryIO.Read.from("inputTable...").named("inputFromBigQuery"))
// apply fancy transform
.apply(CompositeTransform.named("FancyKeyGroupAndFlatten"))
// write output
.apply(BigQueryIO.Write.to("outputTable...").named("outputToBigQuery"));
p.run();
}
}