使用消息数据上传到大查询 - google 数据流

upload to big query using message data - google data flow

我正在通过 pub-sub 接收消息,并希望使用消息数据上传到 big-query 以确定 table 将数据上传到哪个目标。

我尝试执行以下操作:

流水线流水线=Pipeline.create(选项); 字符串 bigQueryTable;

PCollection<String> input = pipeline
        .apply(PubsubIO.Read.subscription("projects/my-data-analysis/subscriptions/myDataflowSub"));

input.apply(ParDo.of(new DoFn<String, TableRow>() {
    @Override
    public void processElement(DoFn<String, TableRow>.ProcessContext c) throws Exception {
        JSONObject firstJSONObject = new JSONObject(c.element());
         bigQueryTable = firstJSONObject.get("tableName").toString();

         TableRow tableRow = convertJsonToTableRow(firstJSONObject);  
        c.output(tableRow);

    }

})).apply(BigQueryIO.Write.to("my-data-analysis:mydataset." + bigQueryTable).withSchema(tableSchema));

有没有不用自己写 DOFN 的方法?

如果我确实需要实现自己的 doFn,如何实现它以上传到大查询?

目前这还不能直接实现,但有各种解决方法涵盖了一些潜在的用例。查看相关问题: