Spark 不忽略空分区

Spark not ignoring empty partitions

我正在尝试使用下推谓词读取数据集的子集。 我的输入数据集包含存储在 s3 上的 1.2TB 和 43436 个镶木地板文件。使用下推谓词,我应该读取 1/4 的数据。

看到火花UI。我看到该作业实际上读取了 1/4 的数据 (300GB),但在作业的第一阶段仍有 43436 个分区,但是这些分区中只有 1/4 有数据,其他 3/4 是空的(检查所附屏幕截图中的中位数输入数据)。

我原以为 Spark 只为非空分区创建分区。与直接通过另一个作业(1/4 的数据)读取预过滤数据集相比,使用下推谓词读取整个数据集时,我看到 20% 的性能开销。我怀疑这个开销是由于我在第一阶段有大量的空partitions/tasks,所以我有两个问题:

  1. 是否有任何解决方法来避免这些空分区?
  2. 您认为还有其他原因导致开销吗?可能是下推过滤器执行自然有点慢?

提前致谢

使用 S3 Select,您只能检索数据的一个子集。

With Amazon EMR release version 5.17.0 and later, you can use S3 Select with Spark on Amazon EMR. S3 Select allows applications to retrieve only a subset of data from an object.

否则,S3 充当对象存储,在这种情况下,必须读取整个对象。 在你的情况下,你必须读取所有文件的所有内容,并在客户端过滤它们

其实很像,这里通过测试可以看出:

The input size was always the same as the Spark job that processed all of the data

您还可以看到this question about optimizing data read from s3 of parquet files

您的文件似乎很小:1.2TB / 43436 ≈ 30MB。所以你可能想看看增加 spark.sql.files.maxPartitionBytes,看看它是否减少了分区总数。我对 S3 没有太多经验,所以不确定它是否会在其描述中提供此注释的帮助:

The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

空分区: 似乎 spark (2.4.5) 试图真正拥有大小≈spark.sql.files.maxPartitionBytes (default 128MB) by packing many files into one partition, source code here 的分区。 然而,它在 运行 作业 之前 完成这项工作,因此它不知道在应用下推谓词后 3/4 的文件将不会输出数据。对于只放置其行将被过滤掉的文件的分区,我最终得到了空分区。这也解释了为什么我的最大分区大小是 44MB 而不是 128MB,因为 none 个分区偶然有文件通过了所有下推过滤器。

20% 开销: 最后这不是因为空分区,我通过将 spark.sql.files.maxPartitionBytes 设置为 1gb 设法减少了空分区,但事实并非如此改善阅读。我认为开销是由于 打开许多文件并读取它们的元数据 。 Spark 估计打开一个文件相当于读取 4MB spark.sql.files.openCostInBytes。因此,即使由于过滤器不会被读取,打开许多文件也不应该被忽略..