使用 Apache Beam 读取大量文件的可扩展方式?

Scalable way to read large numbers of files with Apache Beam?

我正在编写一个管道,我需要从 Sentinel2 dataset located on my Google Cloud Bucket with apache_beam.io.ReadFromTextWithFilename.

中读取元数据文件(500.000 多个文件)

它在一个小子集上工作正常,但是当我 运行 它在整个数据集上它似乎在 "Read Metadata" >> ReadFromTextWithFilename(f'gs://{BUCKET}/{DATA_FOLDER}/**/*metadata.json') 上阻塞。

它甚至没有出现在 Dataflow 作业列表中。

管道看起来像这样:

with beam.Pipeline(options=pipeline_options) as pipeline:
    meta = (
        pipeline
        | "Read Metadata" >> ReadFromTextWithFilename(f'gs://{BUCKET}/{DATA_FOLDER}/**/*metadata.json')
        | "Extract metadata" >> beam.ParDo(ExtractMetaData())
    )
    table_spec = bigquery.TableReference(
        datasetId="sentinel_2",
        tableId="image_labels",
    )
    (
        meta 
        | "Write To BigQuery" >> beam.io.WriteToBigQuery(
            table_spec,
            schema=table_schema(),
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
        )
    )

我在想:

  1. 读取大量文件有没有更聪明的方法?
  2. 将元数据文件复制到一个文件夹中会不会性能更高?(遍历子文件夹要多花多少钱,而不是一个文件夹中的文件)
  3. 是不是先用apache_beam.io.fileio.MatchAll匹配文件名,然后在ParDo后面的一两次读取提取?

看来我得了unwanted fusion的病症。

从 Apache Beam 网站上关于 file-processing 的页面中汲取灵感,我尝试在管道中添加一个 Reshuffle

我还升级到付费 Google 云帐户,从而获得更高的配额。 这导致 Dataflow 更好地处理工作。

事实上,Dataflow 希望为我的 BiqQuery 写入作业扩展到 251 个工作人员。起初它没有提供更多的工人,所以我停止了工作并坐下 --num_workers=NUM_WORKERS--max_num_workers=NUM_WORKERS,其中 NUM_WORKERS 是我项目的最大 qouta。当 运行 这些参数时它会自动放大。

我的最终管道如下所示:

with beam.Pipeline(options=pipeline_options) as pipeline:

          meta = (
              pipeline
              | MatchFiles(f'gs://{BUCKET}/{DATA_FOLDER}/*metadata.json')
              | ReadMatches()
              | beam.Reshuffle()
              | beam.Map(lambda x: (x.metadata.path, x.read_utf8()))
              | beam.ParDo(ExtractMetaData())
          )


          table_spec = bigquery.TableReference(
              datasetId="sentinel_2",
              tableId="image_labels",
          )

          (
              meta 
              | "Write To BigQuery" >> beam.io.WriteToBigQuery(
                  table_spec,
                  schema=table_schema(),
                  write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
              )
          )

附录

我也得到提示 SplitableParDos 可能是一个解决方案,但我还没有测试过。

这可能是由于将文本源 glob 拆分为大量源时管道 运行 进入 Dataflow API 限制。

当前的解决方案是使用变换 ReadAllFromText,不应 运行 到此。

将来我们希望通过使用 Splittable DoFn 框架为这种情况更新转换 ReadFromText