使用 Apache Beam 读取大量文件的可扩展方式?
Scalable way to read large numbers of files with Apache Beam?
我正在编写一个管道,我需要从 Sentinel2 dataset located on my Google Cloud Bucket with apache_beam.io.ReadFromTextWithFilename
.
中读取元数据文件(500.000 多个文件)
它在一个小子集上工作正常,但是当我 运行 它在整个数据集上它似乎在 "Read Metadata" >> ReadFromTextWithFilename(f'gs://{BUCKET}/{DATA_FOLDER}/**/*metadata.json')
上阻塞。
它甚至没有出现在 Dataflow 作业列表中。
管道看起来像这样:
with beam.Pipeline(options=pipeline_options) as pipeline:
meta = (
pipeline
| "Read Metadata" >> ReadFromTextWithFilename(f'gs://{BUCKET}/{DATA_FOLDER}/**/*metadata.json')
| "Extract metadata" >> beam.ParDo(ExtractMetaData())
)
table_spec = bigquery.TableReference(
datasetId="sentinel_2",
tableId="image_labels",
)
(
meta
| "Write To BigQuery" >> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema(),
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
我在想:
- 读取大量文件有没有更聪明的方法?
- 将元数据文件复制到一个文件夹中会不会性能更高?(遍历子文件夹要多花多少钱,而不是一个文件夹中的文件)
- 是不是先用
apache_beam.io.fileio.MatchAll
匹配文件名,然后在ParDo
后面的一两次读取提取?
看来我得了unwanted fusion的病症。
从 Apache Beam 网站上关于 file-processing 的页面中汲取灵感,我尝试在管道中添加一个 Reshuffle
。
我还升级到付费 Google 云帐户,从而获得更高的配额。
这导致 Dataflow 更好地处理工作。
事实上,Dataflow 希望为我的 BiqQuery 写入作业扩展到 251 个工作人员。起初它没有提供更多的工人,所以我停止了工作并坐下 --num_workers=NUM_WORKERS
和 --max_num_workers=NUM_WORKERS
,其中 NUM_WORKERS
是我项目的最大 qouta。当 运行 这些参数时它会自动放大。
我的最终管道如下所示:
with beam.Pipeline(options=pipeline_options) as pipeline:
meta = (
pipeline
| MatchFiles(f'gs://{BUCKET}/{DATA_FOLDER}/*metadata.json')
| ReadMatches()
| beam.Reshuffle()
| beam.Map(lambda x: (x.metadata.path, x.read_utf8()))
| beam.ParDo(ExtractMetaData())
)
table_spec = bigquery.TableReference(
datasetId="sentinel_2",
tableId="image_labels",
)
(
meta
| "Write To BigQuery" >> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema(),
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
)
附录
我也得到提示 SplitableParDos
可能是一个解决方案,但我还没有测试过。
这可能是由于将文本源 glob 拆分为大量源时管道 运行 进入 Dataflow API 限制。
当前的解决方案是使用变换 ReadAllFromText,不应 运行 到此。
将来我们希望通过使用 Splittable DoFn 框架为这种情况更新转换 ReadFromText
。
我正在编写一个管道,我需要从 Sentinel2 dataset located on my Google Cloud Bucket with apache_beam.io.ReadFromTextWithFilename
.
它在一个小子集上工作正常,但是当我 运行 它在整个数据集上它似乎在 "Read Metadata" >> ReadFromTextWithFilename(f'gs://{BUCKET}/{DATA_FOLDER}/**/*metadata.json')
上阻塞。
它甚至没有出现在 Dataflow 作业列表中。
管道看起来像这样:
with beam.Pipeline(options=pipeline_options) as pipeline:
meta = (
pipeline
| "Read Metadata" >> ReadFromTextWithFilename(f'gs://{BUCKET}/{DATA_FOLDER}/**/*metadata.json')
| "Extract metadata" >> beam.ParDo(ExtractMetaData())
)
table_spec = bigquery.TableReference(
datasetId="sentinel_2",
tableId="image_labels",
)
(
meta
| "Write To BigQuery" >> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema(),
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
我在想:
- 读取大量文件有没有更聪明的方法?
- 将元数据文件复制到一个文件夹中会不会性能更高?(遍历子文件夹要多花多少钱,而不是一个文件夹中的文件)
- 是不是先用
apache_beam.io.fileio.MatchAll
匹配文件名,然后在ParDo
后面的一两次读取提取?
看来我得了unwanted fusion的病症。
从 Apache Beam 网站上关于 file-processing 的页面中汲取灵感,我尝试在管道中添加一个 Reshuffle
。
我还升级到付费 Google 云帐户,从而获得更高的配额。 这导致 Dataflow 更好地处理工作。
事实上,Dataflow 希望为我的 BiqQuery 写入作业扩展到 251 个工作人员。起初它没有提供更多的工人,所以我停止了工作并坐下 --num_workers=NUM_WORKERS
和 --max_num_workers=NUM_WORKERS
,其中 NUM_WORKERS
是我项目的最大 qouta。当 运行 这些参数时它会自动放大。
我的最终管道如下所示:
with beam.Pipeline(options=pipeline_options) as pipeline:
meta = (
pipeline
| MatchFiles(f'gs://{BUCKET}/{DATA_FOLDER}/*metadata.json')
| ReadMatches()
| beam.Reshuffle()
| beam.Map(lambda x: (x.metadata.path, x.read_utf8()))
| beam.ParDo(ExtractMetaData())
)
table_spec = bigquery.TableReference(
datasetId="sentinel_2",
tableId="image_labels",
)
(
meta
| "Write To BigQuery" >> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema(),
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
)
附录
我也得到提示 SplitableParDos
可能是一个解决方案,但我还没有测试过。
这可能是由于将文本源 glob 拆分为大量源时管道 运行 进入 Dataflow API 限制。
当前的解决方案是使用变换 ReadAllFromText,不应 运行 到此。
将来我们希望通过使用 Splittable DoFn 框架为这种情况更新转换 ReadFromText
。