Spark 是否在读取时维护镶木地板分区?

Does Spark maintain parquet partitioning on read?

我很难找到这个问题的答案。假设我将数据帧写入镶木地板,并使用 repartition 结合 partitionBy 来获得分区良好的镶木地板文件。见下文:

df.repartition(col("DATE")).write.partitionBy("DATE").parquet("/path/to/parquet/file")

稍后我想阅读 parquet 文件,所以我会这样做:

val df = spark.read.parquet("/path/to/parquet/file")

dataframe 是否按 "DATE" 分区?换句话说,如果 parquet 文件被分区,则 spark 在将其读入 spark 数据帧时会维护该分区。还是随机分区?

此外,为什么和为什么不对这个答案也会有帮助。

读取存储为 parquet 的数据时获取的分区数遵循许多与读取分区文本相同的规则:

  1. 如果SparkContext.minPartitions >= partitions count in data, SparkContext.minPartitions 将被返回。
  2. 如果分区在数据中的计数 >= SparkContext.parallelism,将返回 SparkContext.parallelism,但在一些非常小的分区情况下,#3 可能为真。
  3. 最后,如果数据中的分区计数介于 SparkContext.minPartitions 和 SparkContext.parallelism 之间,通常您会看到数据集分区中反映的分区。

请注意,分区的 parquet 文件很少有分区的完整数据局部性,这意味着,即使数据中的分区计数与读取的分区计数相匹配,也有如果您试图实现分区数据局部性以提高性能,则很有可能应在内存中对数据集进行重新分区。

鉴于您上面的用例,如果您计划在此基础上利用分区本地操作,我建议您立即在 "DATE" 列上重新分区。上述有关 minPartitions 和并行度设置的注意事项也适用于此。

val df = spark.read.parquet("/path/to/parquet/file")
df.repartition(col("DATE"))

您将根据默认为 128MB 的 spark 配置 spark.sql.files.maxPartitionBytes 获得分区数。并且数据不会按照写入时使用的分区列进行分区。

参考https://spark.apache.org/docs/latest/sql-performance-tuning.html

在您的问题中,有两种 方式可以说数据正在“分区”,它们是:

  1. via repartition,它使用哈希分区器将数据分布到特定数量的分区中。如果在您的问题中没有指定数字,则使用 spark.sql.shuffle.partitions 中的值,其默认值为 200。调用 .repartition 通常会触发随机播放,这意味着分区现在分布在您的执行程序池中。

  2. 通过 partitionBy,这是一种特定于 DataFrameWriter 的方法,告诉它在磁盘 上对数据进行分区 按一个键。这意味着写入的数据分布在根据您的分区列命名的子目录中,例如/path/to/parquet/file/DATE=<individual DATE value>。在此示例中,只有具有特定 DATE 值的行存储在每个 DATE= 子目录中

考虑到术语“分区”的这两种用法,在回答您的问题时有一些微妙的方面。由于您使用 partitionBy 并询问 Spark 是否“维护分区”,我怀疑您真正好奇的是 Spark 是否会进行 分区修剪 ,这是一种使用的技术显着提高在分区列上具有过滤器的查询的性能。如果 Spark 知道您查找的值不能在特定的子目录中,它就不会浪费任何时间来读取这些文件,因此您的查询可以更快地完成。

  1. 如果您读取数据的方式不是分区感知的,您将得到许多分区,类似于 bsplosion 的答案。 Spark 不会采用分区修剪,因此您不会获得 Spark 自动忽略读取某些文件以加快速度的好处1.

  2. 幸运的是,读取 Spark 中使用 partitionBy 编写的 parquet 文件是 分区感知读取。即使没有像 Hive 这样的 Metastore 告诉 Spark 文件在磁​​盘上分区,Spark 也会自动发现分区。请参阅 partition discovery in Spark 了解其在 parquet 中的工作原理。

我建议测试在 spark-shell 中读取您的数据集,以便您可以轻松地看到 .explain 的输出,这将让您验证 Spark 是否正确找到分区并删除那些不包含您查询中感兴趣的数据。关于这个 can be found here 的一篇不错的文章。简而言之,如果您看到 PartitionFilters: [],则表示 Spark 没有进行任何分区修剪。但是,如果您看到类似 PartitionFilters: [isnotnull(date#3), (date#3 = 2021-01-01)], 的内容,Spark 仅读取一组特定的 DATE 分区,因此查询执行通常要快得多。

1一个单独的细节是 parquet 在文件本身的列中存储有关数据的统计信息。如果这些统计数据可用于消除无法匹配您正在执行的任何过滤的数据块,例如在 DATE 上,即使您读取数据的方式不是分区感知的,您也会看到一些加速。这称为谓词下推。它之所以有效,是因为磁盘上的文件在使用 .partitionBy 时仍将仅包含 DATE 的特定值。 More info can be found here.