如何在 BigQueryIO.Write 之前读取和转换 CSV Headers?
How do I read and transform CSV Headers before BigQueryIO.Write?
我有一个 csv 文件,其中包含 headers 作为第一行。我正在阅读它并清理那些 headers 以匹配 BigQuery 列要求。但是我需要在管道开始 之前 引用架构。允许 BigQueryIO.Write 以这种方式响应 headers 的最佳做法是什么?目前我的代码看起来像这样:
//create table
Table table = new Table();
// Where logically should the following line go?
TableSchema customSchema = ?
table.setSchema(customSchema);
TableReference tableRef = new TableReference();
tableRef.setDatasetId("foo_dataset");
tableRef.setProjectId("bar_project");
tableRef.setTableId("baz_table");
table.setTableReference(tableRef);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("ReadCSV").from("gs://bucket/file.csv"))
// Detect if it's header row
.apply(ParDo.of(new ExtractHeader()))
.apply(ParDo.of(new ToTableRow())
.apply(BigQueryIO.Write.named("Write")
.to(tableRef)
// Where logically should the following line go?
.withSchema(customSchema));
p.run();
我目前正在尝试实施两个管道,(大致)如下所示,但数据流中的执行顺序不可靠,因此我在 BQ table 不存在的地方遇到错误。
PCollection readIn = p.apply(TextIO.Read.named("ReadCSV").from("gs://bucket/file.csv"))
.apply(ParDo.of(new ExtractHeader()));
TableSchema customSchema = /* generate schema based on what I now know the headers are */
readIn.apply(ParDo.of(new ToTableRow())
.apply(BigQueryIO.Write.named("Write")
.to(tableRef)
// Where logically should the following line go?
.withSchema(customSchema));
p.run();
此功能(动态架构)目前正在审核中 https://github.com/apache/beam/pull/2609(我正在审核中)。您可以尝试进行中的 PR,但请注意其 API 可能会因审查而有所改变。提交 PR 后我会更新此答案。
我有一个 csv 文件,其中包含 headers 作为第一行。我正在阅读它并清理那些 headers 以匹配 BigQuery 列要求。但是我需要在管道开始 之前 引用架构。允许 BigQueryIO.Write 以这种方式响应 headers 的最佳做法是什么?目前我的代码看起来像这样:
//create table
Table table = new Table();
// Where logically should the following line go?
TableSchema customSchema = ?
table.setSchema(customSchema);
TableReference tableRef = new TableReference();
tableRef.setDatasetId("foo_dataset");
tableRef.setProjectId("bar_project");
tableRef.setTableId("baz_table");
table.setTableReference(tableRef);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("ReadCSV").from("gs://bucket/file.csv"))
// Detect if it's header row
.apply(ParDo.of(new ExtractHeader()))
.apply(ParDo.of(new ToTableRow())
.apply(BigQueryIO.Write.named("Write")
.to(tableRef)
// Where logically should the following line go?
.withSchema(customSchema));
p.run();
我目前正在尝试实施两个管道,(大致)如下所示,但数据流中的执行顺序不可靠,因此我在 BQ table 不存在的地方遇到错误。
PCollection readIn = p.apply(TextIO.Read.named("ReadCSV").from("gs://bucket/file.csv"))
.apply(ParDo.of(new ExtractHeader()));
TableSchema customSchema = /* generate schema based on what I now know the headers are */
readIn.apply(ParDo.of(new ToTableRow())
.apply(BigQueryIO.Write.named("Write")
.to(tableRef)
// Where logically should the following line go?
.withSchema(customSchema));
p.run();
此功能(动态架构)目前正在审核中 https://github.com/apache/beam/pull/2609(我正在审核中)。您可以尝试进行中的 PR,但请注意其 API 可能会因审查而有所改变。提交 PR 后我会更新此答案。