使用 google 云数据流读取多个镶木地板文件时如何判断记录来自哪个文件
How to tell which file a record came from when reading multiple parquet files with google cloud dataflow
我需要能够将沿袭追溯到单个 parquet 文件,并且能够执行批量加载,比如在数据流中发现缺陷时重播几年的 parquet 文件.
经过多次尝试,以下方法适用于批量加载,其中 options.input 是 RuntimeValueProvider 而 SplitFn 只是 yields
str.split()
:
with beam.Pipeline(options=PipelineOptions(), argv=args) as p:
mainPipe = p \
| 'CSV of URIs' >> beam.Create([options.input]) \
| 'Split URIs into records' >> beam.ParDo(SplitFn(',')) \
| "Read Parquet" >> beam.io.parquetio.ReadAllFromParquet(columns=[k for k in fields.keys()])
不幸的是beam.io.parquetio.ReadAllFromParquet不会说每条记录来自哪个文件,ReadFromParquet
也不会,parquetio
唯一的其他功能。
如果不离开 Google Cloud Dataflow 或教授团队 Java,我能做些什么来一次将多个 parquet 文件加载到 BigQuery 中并知道每条记录来自哪个文件?
鉴于当前 API 我没有看到针对此的预制解决方案。尽管您可以通过以下任一方式解决问题:
- Extending/modifying ReadAllFromParquet 将文件名附加到输出。
- 使用 BQ tools 从 parquet 导入。我不确定他们是否有完全相同的场景。
我需要能够将沿袭追溯到单个 parquet 文件,并且能够执行批量加载,比如在数据流中发现缺陷时重播几年的 parquet 文件.
经过多次尝试,以下方法适用于批量加载,其中 options.input 是 RuntimeValueProvider 而 SplitFn 只是 yields
str.split()
:
with beam.Pipeline(options=PipelineOptions(), argv=args) as p:
mainPipe = p \
| 'CSV of URIs' >> beam.Create([options.input]) \
| 'Split URIs into records' >> beam.ParDo(SplitFn(',')) \
| "Read Parquet" >> beam.io.parquetio.ReadAllFromParquet(columns=[k for k in fields.keys()])
不幸的是beam.io.parquetio.ReadAllFromParquet不会说每条记录来自哪个文件,ReadFromParquet
也不会,parquetio
唯一的其他功能。
如果不离开 Google Cloud Dataflow 或教授团队 Java,我能做些什么来一次将多个 parquet 文件加载到 BigQuery 中并知道每条记录来自哪个文件?
鉴于当前 API 我没有看到针对此的预制解决方案。尽管您可以通过以下任一方式解决问题:
- Extending/modifying ReadAllFromParquet 将文件名附加到输出。
- 使用 BQ tools 从 parquet 导入。我不确定他们是否有完全相同的场景。