避免重新计算 Beam Python SDK 中所有云存储文件的大小
Avoid recomputing size of all Cloud Storage files in Beam Python SDK
我正在开发一个从 Google 云存储 (GCS) 目录读取约 500 万个文件的管道。我在 Google Cloud Dataflow 上将其配置为 运行。
问题是当我启动管道时,所有文件需要数小时 "computing the size":
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 5549.38778591156 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 7563.196493148804 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
如您所见,计算大约5.5M文件的大小需要一个半小时(5549秒),然后从头开始!又过了2个小时运行第二关,第三关开始了!截至撰写本文时,该作业在 Dataflow 控制台中仍然不可用,这让我相信这一切都发生在我的本地机器上,没有利用任何分布式计算。
当我使用较小的输入数据集(2 个文件)测试管道时,它会重复大小估计 4 次:
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.33771586418151855 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.1244659423828125 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.13422417640686035 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.14139890670776367 seconds.
按照这个速度,仅对所有 5.5M 文件执行 4 次 GCS 大小估计就需要大约 8 个小时,所有这些都在 Dataflow 作业开始之前。
我的管道配置了 --runner=DataflowRunner
选项,因此它在 Dataflow 中应该是 运行ning:
python bigquery_import.py --runner=DataflowRunner #other options...
管道像这样从 GCS 读取:
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
required=True,
help='Input Cloud Storage directory to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
files = p | beam.io.ReadFromText('gs://project/dir/*.har.gz')
完整代码请参阅 GitHub 上的 bigquery_import.py。
我很困惑为什么这个乏味的过程发生在数据流环境之外,为什么它需要多次完成。我是从 GCS 正确读取文件还是有更有效的方法?
感谢您报告此事。 Beam 有两个用于阅读文本的转换。 ReadFromText
和 ReadAllFromText
。 ReadFromText
会 运行 解决这个问题,但 ReadAllFromText
不会。
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py#L438
ReadAllFromText
的缺点是它不会执行动态工作重新平衡,但这在读取大量文件时应该不是问题。
创建 https://issues.apache.org/jira/browse/BEAM-9620 用于跟踪 ReadFromText(以及一般基于文件的来源)的问题。
我正在开发一个从 Google 云存储 (GCS) 目录读取约 500 万个文件的管道。我在 Google Cloud Dataflow 上将其配置为 运行。
问题是当我启动管道时,所有文件需要数小时 "computing the size":
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 5549.38778591156 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 7563.196493148804 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
如您所见,计算大约5.5M文件的大小需要一个半小时(5549秒),然后从头开始!又过了2个小时运行第二关,第三关开始了!截至撰写本文时,该作业在 Dataflow 控制台中仍然不可用,这让我相信这一切都发生在我的本地机器上,没有利用任何分布式计算。
当我使用较小的输入数据集(2 个文件)测试管道时,它会重复大小估计 4 次:
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.33771586418151855 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.1244659423828125 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.13422417640686035 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.14139890670776367 seconds.
按照这个速度,仅对所有 5.5M 文件执行 4 次 GCS 大小估计就需要大约 8 个小时,所有这些都在 Dataflow 作业开始之前。
我的管道配置了 --runner=DataflowRunner
选项,因此它在 Dataflow 中应该是 运行ning:
python bigquery_import.py --runner=DataflowRunner #other options...
管道像这样从 GCS 读取:
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
required=True,
help='Input Cloud Storage directory to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
files = p | beam.io.ReadFromText('gs://project/dir/*.har.gz')
完整代码请参阅 GitHub 上的 bigquery_import.py。
我很困惑为什么这个乏味的过程发生在数据流环境之外,为什么它需要多次完成。我是从 GCS 正确读取文件还是有更有效的方法?
感谢您报告此事。 Beam 有两个用于阅读文本的转换。 ReadFromText
和 ReadAllFromText
。 ReadFromText
会 运行 解决这个问题,但 ReadAllFromText
不会。
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py#L438
ReadAllFromText
的缺点是它不会执行动态工作重新平衡,但这在读取大量文件时应该不是问题。
创建 https://issues.apache.org/jira/browse/BEAM-9620 用于跟踪 ReadFromText(以及一般基于文件的来源)的问题。