从数据流管道写入 BQ 时的动态 table 名称
Dynamic table name when writing to BQ from dataflow pipelines
作为以下问答的后续问题:
我想与 google 数据流工程团队 (@jkff) 确认 Eugene 提出的第三个选项是否完全有可能使用 google 数据流:
"have a ParDo that takes these keys and creates the BigQuery tables, and another ParDo that takes the data and streams writes to the tables"
我的理解是ParDo/DoFn会处理每个元素,我们如何在写出一个ParDo/DoFn?
谢谢。
已更新,带有 DoFn,这显然不起作用,因为 c.element()。value 不是 pcollection。
PCollection<KV<String, Iterable<String>>> output = ...;
public class DynamicOutput2Fn extends DoFn<KV<String, Iterable<String>>, Integer> {
private final PCollectionView<List<String>> keysAsSideinputs;
public DynamicOutput2Fn(PCollectionView<List<String>> keysAsSideinputs) {
this.keysAsSideinputs = keysAsSideinputs;
}
@Override
public void processElement(ProcessContext c) {
List<String> keys = c.sideInput(keysAsSideinputs);
String key = c.element().getKey();
//the below is not working!!! How could we write the value out to a sink, be it gcs file or bq table???
c.element().getValue().apply(Pardo.of(new FormatLineFn()))
.apply(TextIO.Write.to(key));
c.output(1);
}
}
BigQueryIO.Write 转换不支持这个。您可以做的最接近的事情是使用每个 window tables,并通过以下方式对 select table 对象中的 table 所需的任何信息进行编码使用自定义 WindowFn。
如果您不想这样做,可以直接从您的 DoFn 调用 BigQuery API。有了这个,您可以将 table 名称设置为您想要的任何内容,由您的代码计算得出。这可以从辅助输入中查找,或者直接从 DoFn 当前正在处理的元素中计算。为避免对 BigQuery 进行过多的小调用,您可以使用 finishBundle();
对请求进行批处理
您可以在此处查看 Dataflow 运行程序如何进行流式导入:
https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
作为以下问答的后续问题:
我想与 google 数据流工程团队 (@jkff) 确认 Eugene 提出的第三个选项是否完全有可能使用 google 数据流:
"have a ParDo that takes these keys and creates the BigQuery tables, and another ParDo that takes the data and streams writes to the tables"
我的理解是ParDo/DoFn会处理每个元素,我们如何在写出一个ParDo/DoFn?
谢谢。
已更新,带有 DoFn,这显然不起作用,因为 c.element()。value 不是 pcollection。
PCollection<KV<String, Iterable<String>>> output = ...;
public class DynamicOutput2Fn extends DoFn<KV<String, Iterable<String>>, Integer> {
private final PCollectionView<List<String>> keysAsSideinputs;
public DynamicOutput2Fn(PCollectionView<List<String>> keysAsSideinputs) {
this.keysAsSideinputs = keysAsSideinputs;
}
@Override
public void processElement(ProcessContext c) {
List<String> keys = c.sideInput(keysAsSideinputs);
String key = c.element().getKey();
//the below is not working!!! How could we write the value out to a sink, be it gcs file or bq table???
c.element().getValue().apply(Pardo.of(new FormatLineFn()))
.apply(TextIO.Write.to(key));
c.output(1);
}
}
BigQueryIO.Write 转换不支持这个。您可以做的最接近的事情是使用每个 window tables,并通过以下方式对 select table 对象中的 table 所需的任何信息进行编码使用自定义 WindowFn。
如果您不想这样做,可以直接从您的 DoFn 调用 BigQuery API。有了这个,您可以将 table 名称设置为您想要的任何内容,由您的代码计算得出。这可以从辅助输入中查找,或者直接从 DoFn 当前正在处理的元素中计算。为避免对 BigQuery 进行过多的小调用,您可以使用 finishBundle();
对请求进行批处理您可以在此处查看 Dataflow 运行程序如何进行流式导入: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java