Spark 不忽略空分区
Spark not ignoring empty partitions
我正在尝试使用下推谓词读取数据集的子集。
我的输入数据集包含存储在 s3 上的 1.2TB 和 43436 个镶木地板文件。使用下推谓词,我应该读取 1/4 的数据。
看到火花UI。我看到该作业实际上读取了 1/4 的数据 (300GB),但在作业的第一阶段仍有 43436 个分区,但是这些分区中只有 1/4 有数据,其他 3/4 是空的(检查所附屏幕截图中的中位数输入数据)。
我原以为 Spark 只为非空分区创建分区。与直接通过另一个作业(1/4 的数据)读取预过滤数据集相比,使用下推谓词读取整个数据集时,我看到 20% 的性能开销。我怀疑这个开销是由于我在第一阶段有大量的空partitions/tasks,所以我有两个问题:
- 是否有任何解决方法来避免这些空分区?
- 您认为还有其他原因导致开销吗?可能是下推过滤器执行自然有点慢?
提前致谢
使用 S3 Select,您只能检索数据的一个子集。
With Amazon EMR release version 5.17.0 and later, you can use S3 Select with Spark on Amazon EMR. S3 Select allows applications to retrieve only a subset of data from an object.
否则,S3 充当对象存储,在这种情况下,必须读取整个对象。 在你的情况下,你必须读取所有文件的所有内容,并在客户端过滤它们。
其实很像,这里通过测试可以看出:
The input size was always the same as the Spark job that processed all of the data
您还可以看到this question about optimizing data read from s3 of parquet files。
您的文件似乎很小:1.2TB / 43436 ≈ 30MB。所以你可能想看看增加 spark.sql.files.maxPartitionBytes
,看看它是否减少了分区总数。我对 S3 没有太多经验,所以不确定它是否会在其描述中提供此注释的帮助:
The maximum number of bytes to pack into a single partition when
reading files. This configuration is effective only when using
file-based sources such as Parquet, JSON and ORC.
空分区: 似乎 spark (2.4.5) 试图真正拥有大小≈spark.sql.files.maxPartitionBytes (default 128MB) by packing many files into one partition, source code here 的分区。
然而,它在 运行 作业 之前 完成这项工作,因此它不知道在应用下推谓词后 3/4 的文件将不会输出数据。对于只放置其行将被过滤掉的文件的分区,我最终得到了空分区。这也解释了为什么我的最大分区大小是 44MB 而不是 128MB,因为 none 个分区偶然有文件通过了所有下推过滤器。
20% 开销: 最后这不是因为空分区,我通过将 spark.sql.files.maxPartitionBytes 设置为 1gb 设法减少了空分区,但事实并非如此改善阅读。我认为开销是由于 打开许多文件并读取它们的元数据 。
Spark 估计打开一个文件相当于读取 4MB spark.sql.files.openCostInBytes。因此,即使由于过滤器不会被读取,打开许多文件也不应该被忽略..
我正在尝试使用下推谓词读取数据集的子集。 我的输入数据集包含存储在 s3 上的 1.2TB 和 43436 个镶木地板文件。使用下推谓词,我应该读取 1/4 的数据。
看到火花UI。我看到该作业实际上读取了 1/4 的数据 (300GB),但在作业的第一阶段仍有 43436 个分区,但是这些分区中只有 1/4 有数据,其他 3/4 是空的(检查所附屏幕截图中的中位数输入数据)。
我原以为 Spark 只为非空分区创建分区。与直接通过另一个作业(1/4 的数据)读取预过滤数据集相比,使用下推谓词读取整个数据集时,我看到 20% 的性能开销。我怀疑这个开销是由于我在第一阶段有大量的空partitions/tasks,所以我有两个问题:
- 是否有任何解决方法来避免这些空分区?
- 您认为还有其他原因导致开销吗?可能是下推过滤器执行自然有点慢?
提前致谢
使用 S3 Select,您只能检索数据的一个子集。
With Amazon EMR release version 5.17.0 and later, you can use S3 Select with Spark on Amazon EMR. S3 Select allows applications to retrieve only a subset of data from an object.
否则,S3 充当对象存储,在这种情况下,必须读取整个对象。 在你的情况下,你必须读取所有文件的所有内容,并在客户端过滤它们。
其实很像
The input size was always the same as the Spark job that processed all of the data
您还可以看到this question about optimizing data read from s3 of parquet files。
您的文件似乎很小:1.2TB / 43436 ≈ 30MB。所以你可能想看看增加 spark.sql.files.maxPartitionBytes
,看看它是否减少了分区总数。我对 S3 没有太多经验,所以不确定它是否会在其描述中提供此注释的帮助:
The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.
空分区: 似乎 spark (2.4.5) 试图真正拥有大小≈spark.sql.files.maxPartitionBytes (default 128MB) by packing many files into one partition, source code here 的分区。 然而,它在 运行 作业 之前 完成这项工作,因此它不知道在应用下推谓词后 3/4 的文件将不会输出数据。对于只放置其行将被过滤掉的文件的分区,我最终得到了空分区。这也解释了为什么我的最大分区大小是 44MB 而不是 128MB,因为 none 个分区偶然有文件通过了所有下推过滤器。
20% 开销: 最后这不是因为空分区,我通过将 spark.sql.files.maxPartitionBytes 设置为 1gb 设法减少了空分区,但事实并非如此改善阅读。我认为开销是由于 打开许多文件并读取它们的元数据 。 Spark 估计打开一个文件相当于读取 4MB spark.sql.files.openCostInBytes。因此,即使由于过滤器不会被读取,打开许多文件也不应该被忽略..