使用消息数据上传到大查询 - google 数据流
upload to big query using message data - google data flow
我正在通过 pub-sub 接收消息,并希望使用消息数据上传到 big-query 以确定 table 将数据上传到哪个目标。
我尝试执行以下操作:
流水线流水线=Pipeline.create(选项);
字符串 bigQueryTable;
PCollection<String> input = pipeline
.apply(PubsubIO.Read.subscription("projects/my-data-analysis/subscriptions/myDataflowSub"));
input.apply(ParDo.of(new DoFn<String, TableRow>() {
@Override
public void processElement(DoFn<String, TableRow>.ProcessContext c) throws Exception {
JSONObject firstJSONObject = new JSONObject(c.element());
bigQueryTable = firstJSONObject.get("tableName").toString();
TableRow tableRow = convertJsonToTableRow(firstJSONObject);
c.output(tableRow);
}
})).apply(BigQueryIO.Write.to("my-data-analysis:mydataset." + bigQueryTable).withSchema(tableSchema));
有没有不用自己写 DOFN 的方法?
如果我确实需要实现自己的 doFn,如何实现它以上传到大查询?
目前这还不能直接实现,但有各种解决方法涵盖了一些潜在的用例。查看相关问题:
我正在通过 pub-sub 接收消息,并希望使用消息数据上传到 big-query 以确定 table 将数据上传到哪个目标。
我尝试执行以下操作:
流水线流水线=Pipeline.create(选项); 字符串 bigQueryTable;
PCollection<String> input = pipeline
.apply(PubsubIO.Read.subscription("projects/my-data-analysis/subscriptions/myDataflowSub"));
input.apply(ParDo.of(new DoFn<String, TableRow>() {
@Override
public void processElement(DoFn<String, TableRow>.ProcessContext c) throws Exception {
JSONObject firstJSONObject = new JSONObject(c.element());
bigQueryTable = firstJSONObject.get("tableName").toString();
TableRow tableRow = convertJsonToTableRow(firstJSONObject);
c.output(tableRow);
}
})).apply(BigQueryIO.Write.to("my-data-analysis:mydataset." + bigQueryTable).withSchema(tableSchema));
有没有不用自己写 DOFN 的方法?
如果我确实需要实现自己的 doFn,如何实现它以上传到大查询?
目前这还不能直接实现,但有各种解决方法涵盖了一些潜在的用例。查看相关问题: