使用 DataFlow 将不同方案的多个文件加载到 BigQuery

Loading multiple files of different scheme to BigQuery using DataFlow

我有一组 .txt 文件,其中包含具有不同架构的 JSON 格式的数据,所有这些文件都将加载到一个通用的 bigquery table。文件格式很简单,一组文件有4列,有的有5列,有的5列相同但顺序不同...

例如:文件夹 1 中的文件'{"name":"John", "age":31, "city":"New York"}'

文件夹 2 中的文件'{"name":"Tom", "age":32, "city":"New York", "zip":"12345"}'

文件夹 3 中的文件'{"name":"Janice", "age":31, "zip":"12345", "city":"New York"}'

文件夹 4 中的文件'{"name":"Alice", "age":34, "zip":"12348"}'

文件夹 5 中的文件'{"name":"Rob", "age":31, "zip":"12345", "Phone":"1234567891"}'

BQ table 将包含列、姓名、年龄、城市、邮政编码、phone。

每种类型的文件都在GCS中的不同文件夹中,每个文件夹中的文件是一致的,在GCS bucket子文件夹中所有文件都是相同类型,相同数量和列顺序。

我能够将这些文件单独加载到相应的 BQ tables,但我需要通过为 'not applicable' 列加载 NULL 将它们加载到单个 table。

寻求建议以批处理方式使用数据流加载数据到BQ。我对此很陌生,感谢您的帮助。

Cloud Storage Text to BigQuery,一个 Google-provided 模板,可用于您的用例。

如文档中所述,请创建 BigQuery 架构文件,该文件结合了所有已知架构和 JavaScript 文件来处理任何缺失的字段(BigQuery 可能能够自动将缺失值替换为空值,但我还没有测试过)。

您可以按照此 doc 启动模板作业。


更新:根据评论,您似乎有兴趣创建自己的管道。这就是您从 GCS 读取数据并写入 BigQuery 的方式。

schema="name:STRING,age:INTEGER,zip:STRING,phone:STRING,city:STRING"

_ = (p
    | "read messages" >> beam.io.ReadFromText('gs://path/to/files/*.txt')
    | "parse" >> beam.Map(json.loads)
    | 'write to bigquery' >> beam.io.WriteToBigQuery('project:datasetId.tableId', schema=schema)
)

未指定的值自动保存为nulls。

我就是这样保存数据的。

$ gsutil cat gs://path/to/files/example.txt
{"name":"John", "age":31, "city":"New York"}