从数据流管道写入 BQ 时的动态 table 名称

Dynamic table name when writing to BQ from dataflow pipelines

作为以下问答的后续问题:

我想与 google 数据流工程团队 (@jkff) 确认 Eugene 提出的第三个选项是否完全有可能使用 google 数据流:

"have a ParDo that takes these keys and creates the BigQuery tables, and another ParDo that takes the data and streams writes to the tables"

我的理解是ParDo/DoFn会处理每个元素,我们如何在写出一个ParDo/DoFn?

谢谢。

已更新,带有 DoFn,这显然不起作用,因为 c.element()。value 不是 pcollection。

PCollection<KV<String, Iterable<String>>> output = ...;

public class DynamicOutput2Fn extends DoFn<KV<String, Iterable<String>>, Integer> {

private final PCollectionView<List<String>> keysAsSideinputs;
public DynamicOutput2Fn(PCollectionView<List<String>> keysAsSideinputs) {
        this.keysAsSideinputs = keysAsSideinputs;
    }

@Override
    public void processElement(ProcessContext c) {
        List<String> keys = c.sideInput(keysAsSideinputs);
        String key = c.element().getKey();

        //the below is not working!!! How could we write the value out to a sink, be it gcs file or bq table???
        c.element().getValue().apply(Pardo.of(new FormatLineFn()))
                .apply(TextIO.Write.to(key));

        c.output(1);
    }    
}    

BigQueryIO.Write 转换不支持这个。您可以做的最接近的事情是使用每个 window tables,并通过以下方式对 select table 对象中的 table 所需的任何信息进行编码使用自定义 WindowFn。

如果您不想这样做,可以直接从您的 DoFn 调用 BigQuery API。有了这个,您可以将 table 名称设置为您想要的任何内容,由您的代码计算得出。这可以从辅助输入中查找,或者直接从 DoFn 当前正在处理的元素中计算。为避免对 BigQuery 进行过多的小调用,您可以使用 finishBundle();

对请求进行批处理

您可以在此处查看 Dataflow 运行程序如何进行流式导入: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java