将文件保存到 Parquet 时,分区列移动到行尾

Partition column is moved to end of row when saving a file to Parquet

对于给定的 DataFrame,就在 saveparquet 之前,这里是模式:注意 centroid0 第一个 列并且是 StringType:

但是当保存文件时使用:

      df.write.partitionBy(dfHolder.metadata.partitionCols: _*).format("parquet").mode("overwrite").save(fpath)

并且 partitionColscentroid0:

然后有一个(对我来说)令人惊讶的结果:

我通过println确认了输出路径:

 path=/git/block/target/scala-2.11/test-classes/data/output/blocking/out//level1/clusters

这是从保存的 parquet:

读取 back 后的架构

为什么会对输入模式进行这两项修改 - 如何避免 - 同时仍将 centroid0 保留为分区列?

Update 首选答案应该提及为什么/何时将分区添加到 end(与 beginning ) 的列列表。我们需要了解确定性排序。

此外 - 有什么方法可以使推断的列类型从 spark 变为 "change it's mind" 吗?我不得不将分区从 01 等更改为 c0c1 等,以便推断映射到 StringType。也许这是必需的......但如果有一些火花设置来改变行为,那将是一个很好的答案。

其实原因很简单。按列分区时,每个分区只能包含该列的一个值。因此,实际上在文件中各处写入相同的值是没有用的,这就是 Spark 不这样做的原因。读取文件时,Spark 使用文件名中包含的信息重建分区列并将其放在模式的末尾。列的类型未存储,它是在读取时推断的,因此在您的情况下是整数类型。 注意:关于为什么在末尾添加该列没有特别的原因。它可能是在一开始。我想这只是一个任意的实现选择。

为避免丢失列的类型和顺序,您可以像这样复制分区列df.withColumn("X", 'YOUR_COLUMN).write.partitionBy("X").parquet("...")

虽然你会浪费 space。此外,例如,spark 使用分区来优化过滤器。不要忘记在读取数据后使用 X 列进行筛选,而不是您的列,否则 Spark 将无法执行任何优化。

当您 write.partitionBy(...) Spark 将分区字段保存为文件夹时 这对于以后读取数据可能是有益的,因为(对于某些文件类型,包括镶木地板)它可以优化以仅从您使用的分区读取数据(即,如果您读取并过滤 centroid0==1 spark 将不会读取其他分区

这样做的效果是分区字段(在您的情况下为 centroid0)不会仅作为文件夹名称(centroid0=1centroid0=2 等)写入 parquet 文件。 )

这些的副作用是 1. 分区的类型是在 运行 时推断出来的(因为模式没有保存在镶木地板中)并且在你的情况下碰巧你只有整数值所以它被推断为整数。

另一个副作用是分区字段被添加到模式的 end/beginning 中,因为它从镶木地板文件中读取模式作为一个块,然后将分区字段添加到其中作为另一个(同样,它不再是存储在镶木地板中的架构的一部分)

实际上,您可以很容易地利用包含分区数据架构的案例 class 的列的排序。您将需要从路径中读取数据,其中分区列存储在下面以使 Spark 推断这些列的值。然后通过使用 case class 模式和如下语句简单地应用 re-ordering:

val encoder: Encoder[RecordType] = Encoders.product[RecordType]
spark.read
      .schema(encoder.schema)
      .format("parquet")
      .option("mergeSchema", "true")
      .load(myPath)
      // reorder columns, since reading from partitioned data, the partitioning columns are put to end
      .select(encoder.schema.fieldNames.head, encoder.schema.fieldNames.tail: _*)
      .as[RecordType]