Apache Beam 管道摄取 "Big" 输入文件(超过 1GB)不创建任何输出文件
apache beam pipeline ingesting "Big" input file (more than 1GB) doesn't create any output file
关于计算的数据流模型,我正在做一个 PoC 来测试一些概念,使用 apache beam 和 direct-runner(和 java sdk)。我在创建一个读取“大”csv 文件(大约 1.25GB)并将其转储到输出文件中而没有任何特定转换的管道时遇到了问题,如以下代码(我主要关注使用此测试 IO 瓶颈 dataflow/beam 模型,因为这对我来说最重要):
// Example 1 reading and writing to a file
Pipeline pipeline = Pipeline.create();
PCollection<String> output = ipeline
.apply(TextIO.read().from("BIG_CSV_FILE"));
output.apply(
TextIO
.write()
.to("BIG_OUTPUT")
.withSuffix("csv").withNumShards(1));
pipeline.run();
我遇到的问题是只有较小的文件可以工作,但是当使用大文件时,没有生成输出文件(而且也没有显示 error/exception,这使得调试更难)。
我知道在 apache-beam 项目 (https://beam.apache.org/documentation/runners/direct/) 的 runners 页面上,在内存注意事项下明确说明:
Local execution is limited by the memory available in your local environment. It is highly recommended that you run your pipeline with
data sets small enough to fit in local memory. You can create a small
in-memory data set using a Create transform, or you can use a Read
transform to work with small local or remote files.
以上表明我遇到了内存问题(但遗憾的是控制台上没有明确说明,所以我只是想知道)。我也很关心他们关于数据集应该装入内存的建议(为什么它不从文件中读取部分而不是将整个 file/dataset 装入内存?)
我还想在此对话中添加的第二个考虑因素是(以防这确实是内存问题):直接运行器的实现有多基础?我的意思是,实现一段从大文件中分块读取并输出到新文件(也分块)的代码并不难,因此内存使用在任何时候都不会成为问题(因为这两个文件都没有完全加载到内存中 - 只有当前的“块”)。即使“直接运行器”更像是一个测试语义的原型运行器,期望它能很好地处理大文件会不会太过分了? - 考虑到这是一个统一的模型,用于处理流式传输,其中 window 大小是任意的,巨大的数据 accumulation/aggregation 在下沉之前是一个标准用例。
所以我非常感谢您 feedback/comments 关于以下任何一点的问题:您是否注意到使用直接运行器的 IO 限制?是我忽略了某些方面还是直接执行者真的如此天真地实施?您是否通过使用像 flink/spark/google 云数据流这样的合适的生产运行器来验证此约束消失了?
我最终会与其他跑步者一起测试,如 flink 或 spark one,但直接跑步者(即使它仅用于原型设计目的)在第一次测试时遇到问题我感到很失望我' m 运行 on - 考虑到整个数据流理念是基于在统一 batch/streaming 模型的保护下摄取、处理、分组和分发大量数据。
编辑(反映 Kenn 的反馈):
Kenn,感谢您提供的宝贵意见和反馈,它们对我指向相关文档提供了很大帮助。根据你的建议,我通过分析应用程序发现问题确实是一个 java 堆相关的问题(不知何故从未在普通控制台上显示 - 并且只在分析器上看到)。尽管该文件的大小“仅”为 1.25GB,但在转储堆之前内部使用量超过了 4GB,这表明直接运行程序不是“按块工作”,而是确实将所有内容加载到内存中(正如他们的文档所说)。
关于你的观点:
1- 我相信序列化和改组仍然可以通过“逐块”实现来很好地实现。也许我对直跑者应该有什么能力抱有错误的期望,或者我没有完全掌握它的预期范围,现在我将避免在使用直跑者时进行非功能性类型的测试。
2 - 关于分片。我相信 NumOfShards 在写入阶段控制并行度(和输出文件的数量)(在此之前的处理应该仍然是完全并行的,并且只有在编写时,它才会使用尽可能多的工作人员 - 并生成尽可能多的文件 -明确规定)。相信这一点的两个原因是:首先,CPU 分析器总是显示 8 个忙碌的“direct-runner-workers”——反映我的 PC 拥有的逻辑核心数量——独立于我设置 1 个分片还是 N 个分片.第二个原因是我从这里的文档中了解到的 (https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/io/WriteFiles.html) :
By default, every bundle in the input PCollection will be processed by
a FileBasedSink.WriteOperation, so the number of output will vary
based on runner behavior, though at least 1 output will always be
produced. The exact parallelism of the write stage can be controlled
using withNumShards(int), typically used to control how many files
are produced or to globally limit the number of workers connecting to
an external service. However, this option can often hurt performance:
it adds an additional GroupByKey to the pipeline.
这里有一件有趣的事情是“将额外的 GroupByKey 添加到管道”在我的用例中有点不受欢迎(我只希望在 1 个文件中得到结果,而不考虑顺序或分组),
因此,在生成 N 个分片输出文件之后,可能添加一个额外的“展平”文件步骤是一个更好的方法。
3 - 您的分析建议很准确,谢谢。
最终编辑 直接运行器不用于性能测试,仅用于原型设计和数据的良好形成。它没有任何按分区拆分和划分工作的机制,并且处理内存中的所有内容
存在一些问题或可能性。我会按优先顺序回答。
- 直接运行器用于测试非常小的数据。它专为最大质量保证而设计,性能不是最重要的。例如:
- 它随机打乱数据以确保您不依赖于生产中不存在的顺序
- 它会在每一步之后对数据进行序列化和反序列化,以确保数据能够正确传输(生产运行者会尽可能避免序列化)
- 它会检查您是否以禁止的方式改变了元素,这会导致您在生产中丢失数据
你描述的数据不是很大,DirectRunner在正常情况下最终还是可以处理的。
您指定了 numShards(1)
,它明确地消除了所有并行性。它会导致所有数据在单个线程中组合和处理,因此即使在 DirectRunner 上,它也会比它可能的要慢。通常,您会希望避免人为地限制并行度。
如果有任何内存不足错误或其他阻止处理的错误,您应该会看到很多消息。否则,查看分析和 CPU 利用率以确定处理是否处于活动状态将很有帮助。
上面的 Kenn Knowles 已经间接回答了这个问题。直接运行器不用于性能测试,仅用于原型设计和数据的良好形成。它没有任何按分区拆分和划分工作的机制,并处理内存中的每个数据集。应使用其他运行器(如 Flink Runner)进行性能测试 - 这些将提供数据拆分和处理高 IO 瓶颈所需的基础设施类型。
更新: 添加到这个问题的要点,这里有一个相关的问题:
而这里的问题围绕着弄清楚直接跑步者是否可以处理巨大的数据集(我们已经在这里确定这是不可能的);上面提供的 link 指向天气生产运行器的讨论(如 flink/spark/cloud 数据流)可以开箱即用地处理大量数据集(简短的回答是肯定的,但请检查自己 link 进行更深入的讨论)。
关于计算的数据流模型,我正在做一个 PoC 来测试一些概念,使用 apache beam 和 direct-runner(和 java sdk)。我在创建一个读取“大”csv 文件(大约 1.25GB)并将其转储到输出文件中而没有任何特定转换的管道时遇到了问题,如以下代码(我主要关注使用此测试 IO 瓶颈 dataflow/beam 模型,因为这对我来说最重要):
// Example 1 reading and writing to a file
Pipeline pipeline = Pipeline.create();
PCollection<String> output = ipeline
.apply(TextIO.read().from("BIG_CSV_FILE"));
output.apply(
TextIO
.write()
.to("BIG_OUTPUT")
.withSuffix("csv").withNumShards(1));
pipeline.run();
我遇到的问题是只有较小的文件可以工作,但是当使用大文件时,没有生成输出文件(而且也没有显示 error/exception,这使得调试更难)。
我知道在 apache-beam 项目 (https://beam.apache.org/documentation/runners/direct/) 的 runners 页面上,在内存注意事项下明确说明:
Local execution is limited by the memory available in your local environment. It is highly recommended that you run your pipeline with data sets small enough to fit in local memory. You can create a small in-memory data set using a Create transform, or you can use a Read transform to work with small local or remote files.
以上表明我遇到了内存问题(但遗憾的是控制台上没有明确说明,所以我只是想知道)。我也很关心他们关于数据集应该装入内存的建议(为什么它不从文件中读取部分而不是将整个 file/dataset 装入内存?)
我还想在此对话中添加的第二个考虑因素是(以防这确实是内存问题):直接运行器的实现有多基础?我的意思是,实现一段从大文件中分块读取并输出到新文件(也分块)的代码并不难,因此内存使用在任何时候都不会成为问题(因为这两个文件都没有完全加载到内存中 - 只有当前的“块”)。即使“直接运行器”更像是一个测试语义的原型运行器,期望它能很好地处理大文件会不会太过分了? - 考虑到这是一个统一的模型,用于处理流式传输,其中 window 大小是任意的,巨大的数据 accumulation/aggregation 在下沉之前是一个标准用例。
所以我非常感谢您 feedback/comments 关于以下任何一点的问题:您是否注意到使用直接运行器的 IO 限制?是我忽略了某些方面还是直接执行者真的如此天真地实施?您是否通过使用像 flink/spark/google 云数据流这样的合适的生产运行器来验证此约束消失了?
我最终会与其他跑步者一起测试,如 flink 或 spark one,但直接跑步者(即使它仅用于原型设计目的)在第一次测试时遇到问题我感到很失望我' m 运行 on - 考虑到整个数据流理念是基于在统一 batch/streaming 模型的保护下摄取、处理、分组和分发大量数据。
编辑(反映 Kenn 的反馈): Kenn,感谢您提供的宝贵意见和反馈,它们对我指向相关文档提供了很大帮助。根据你的建议,我通过分析应用程序发现问题确实是一个 java 堆相关的问题(不知何故从未在普通控制台上显示 - 并且只在分析器上看到)。尽管该文件的大小“仅”为 1.25GB,但在转储堆之前内部使用量超过了 4GB,这表明直接运行程序不是“按块工作”,而是确实将所有内容加载到内存中(正如他们的文档所说)。
关于你的观点:
1- 我相信序列化和改组仍然可以通过“逐块”实现来很好地实现。也许我对直跑者应该有什么能力抱有错误的期望,或者我没有完全掌握它的预期范围,现在我将避免在使用直跑者时进行非功能性类型的测试。
2 - 关于分片。我相信 NumOfShards 在写入阶段控制并行度(和输出文件的数量)(在此之前的处理应该仍然是完全并行的,并且只有在编写时,它才会使用尽可能多的工作人员 - 并生成尽可能多的文件 -明确规定)。相信这一点的两个原因是:首先,CPU 分析器总是显示 8 个忙碌的“direct-runner-workers”——反映我的 PC 拥有的逻辑核心数量——独立于我设置 1 个分片还是 N 个分片.第二个原因是我从这里的文档中了解到的 (https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/io/WriteFiles.html) :
By default, every bundle in the input PCollection will be processed by a FileBasedSink.WriteOperation, so the number of output will vary based on runner behavior, though at least 1 output will always be produced. The exact parallelism of the write stage can be controlled using withNumShards(int), typically used to control how many files are produced or to globally limit the number of workers connecting to an external service. However, this option can often hurt performance: it adds an additional GroupByKey to the pipeline.
这里有一件有趣的事情是“将额外的 GroupByKey 添加到管道”在我的用例中有点不受欢迎(我只希望在 1 个文件中得到结果,而不考虑顺序或分组), 因此,在生成 N 个分片输出文件之后,可能添加一个额外的“展平”文件步骤是一个更好的方法。
3 - 您的分析建议很准确,谢谢。
最终编辑 直接运行器不用于性能测试,仅用于原型设计和数据的良好形成。它没有任何按分区拆分和划分工作的机制,并且处理内存中的所有内容
存在一些问题或可能性。我会按优先顺序回答。
- 直接运行器用于测试非常小的数据。它专为最大质量保证而设计,性能不是最重要的。例如:
- 它随机打乱数据以确保您不依赖于生产中不存在的顺序
- 它会在每一步之后对数据进行序列化和反序列化,以确保数据能够正确传输(生产运行者会尽可能避免序列化)
- 它会检查您是否以禁止的方式改变了元素,这会导致您在生产中丢失数据
你描述的数据不是很大,DirectRunner在正常情况下最终还是可以处理的。
您指定了
numShards(1)
,它明确地消除了所有并行性。它会导致所有数据在单个线程中组合和处理,因此即使在 DirectRunner 上,它也会比它可能的要慢。通常,您会希望避免人为地限制并行度。如果有任何内存不足错误或其他阻止处理的错误,您应该会看到很多消息。否则,查看分析和 CPU 利用率以确定处理是否处于活动状态将很有帮助。
上面的 Kenn Knowles 已经间接回答了这个问题。直接运行器不用于性能测试,仅用于原型设计和数据的良好形成。它没有任何按分区拆分和划分工作的机制,并处理内存中的每个数据集。应使用其他运行器(如 Flink Runner)进行性能测试 - 这些将提供数据拆分和处理高 IO 瓶颈所需的基础设施类型。
更新: 添加到这个问题的要点,这里有一个相关的问题:
而这里的问题围绕着弄清楚直接跑步者是否可以处理巨大的数据集(我们已经在这里确定这是不可能的);上面提供的 link 指向天气生产运行器的讨论(如 flink/spark/cloud 数据流)可以开箱即用地处理大量数据集(简短的回答是肯定的,但请检查自己 link 进行更深入的讨论)。