使用 DataFlow 将不同方案的多个文件加载到 BigQuery
Loading multiple files of different scheme to BigQuery using DataFlow
我有一组 .txt 文件,其中包含具有不同架构的 JSON 格式的数据,所有这些文件都将加载到一个通用的 bigquery table。文件格式很简单,一组文件有4列,有的有5列,有的5列相同但顺序不同...
例如:文件夹 1 中的文件'{"name":"John", "age":31, "city":"New York"}'
文件夹 2 中的文件'{"name":"Tom", "age":32, "city":"New York", "zip":"12345"}'
文件夹 3 中的文件'{"name":"Janice", "age":31, "zip":"12345", "city":"New York"}'
文件夹 4 中的文件'{"name":"Alice", "age":34, "zip":"12348"}'
文件夹 5 中的文件'{"name":"Rob", "age":31, "zip":"12345", "Phone":"1234567891"}'
BQ table 将包含列、姓名、年龄、城市、邮政编码、phone。
每种类型的文件都在GCS中的不同文件夹中,每个文件夹中的文件是一致的,在GCS bucket子文件夹中所有文件都是相同类型,相同数量和列顺序。
我能够将这些文件单独加载到相应的 BQ tables,但我需要通过为 'not applicable' 列加载 NULL 将它们加载到单个 table。
寻求建议以批处理方式使用数据流加载数据到BQ。我对此很陌生,感谢您的帮助。
Cloud Storage Text to BigQuery,一个 Google-provided 模板,可用于您的用例。
如文档中所述,请创建 BigQuery 架构文件,该文件结合了所有已知架构和 JavaScript 文件来处理任何缺失的字段(BigQuery 可能能够自动将缺失值替换为空值,但我还没有测试过)。
您可以按照此 doc 启动模板作业。
更新:根据评论,您似乎有兴趣创建自己的管道。这就是您从 GCS 读取数据并写入 BigQuery 的方式。
schema="name:STRING,age:INTEGER,zip:STRING,phone:STRING,city:STRING"
_ = (p
| "read messages" >> beam.io.ReadFromText('gs://path/to/files/*.txt')
| "parse" >> beam.Map(json.loads)
| 'write to bigquery' >> beam.io.WriteToBigQuery('project:datasetId.tableId', schema=schema)
)
未指定的值自动保存为null
s。
我就是这样保存数据的。
$ gsutil cat gs://path/to/files/example.txt
{"name":"John", "age":31, "city":"New York"}
我有一组 .txt 文件,其中包含具有不同架构的 JSON 格式的数据,所有这些文件都将加载到一个通用的 bigquery table。文件格式很简单,一组文件有4列,有的有5列,有的5列相同但顺序不同...
例如:文件夹 1 中的文件'{"name":"John", "age":31, "city":"New York"}'
文件夹 2 中的文件'{"name":"Tom", "age":32, "city":"New York", "zip":"12345"}'
文件夹 3 中的文件'{"name":"Janice", "age":31, "zip":"12345", "city":"New York"}'
文件夹 4 中的文件'{"name":"Alice", "age":34, "zip":"12348"}'
文件夹 5 中的文件'{"name":"Rob", "age":31, "zip":"12345", "Phone":"1234567891"}'
BQ table 将包含列、姓名、年龄、城市、邮政编码、phone。
每种类型的文件都在GCS中的不同文件夹中,每个文件夹中的文件是一致的,在GCS bucket子文件夹中所有文件都是相同类型,相同数量和列顺序。
我能够将这些文件单独加载到相应的 BQ tables,但我需要通过为 'not applicable' 列加载 NULL 将它们加载到单个 table。
寻求建议以批处理方式使用数据流加载数据到BQ。我对此很陌生,感谢您的帮助。
Cloud Storage Text to BigQuery,一个 Google-provided 模板,可用于您的用例。
如文档中所述,请创建 BigQuery 架构文件,该文件结合了所有已知架构和 JavaScript 文件来处理任何缺失的字段(BigQuery 可能能够自动将缺失值替换为空值,但我还没有测试过)。
您可以按照此 doc 启动模板作业。
更新:根据评论,您似乎有兴趣创建自己的管道。这就是您从 GCS 读取数据并写入 BigQuery 的方式。
schema="name:STRING,age:INTEGER,zip:STRING,phone:STRING,city:STRING"
_ = (p
| "read messages" >> beam.io.ReadFromText('gs://path/to/files/*.txt')
| "parse" >> beam.Map(json.loads)
| 'write to bigquery' >> beam.io.WriteToBigQuery('project:datasetId.tableId', schema=schema)
)
未指定的值自动保存为null
s。
我就是这样保存数据的。
$ gsutil cat gs://path/to/files/example.txt
{"name":"John", "age":31, "city":"New York"}