将文件保存到 Parquet 时,分区列移动到行尾
Partition column is moved to end of row when saving a file to Parquet
对于给定的 DataFrame,就在 save
到 parquet
之前,这里是模式:注意 centroid0
是 第一个 列并且是 StringType
:
但是当保存文件时使用:
df.write.partitionBy(dfHolder.metadata.partitionCols: _*).format("parquet").mode("overwrite").save(fpath)
并且 partitionCols
为 centroid0
:
然后有一个(对我来说)令人惊讶的结果:
centroid0
分区列已移至行 的末端
- 数据类型已更改为
Integer
我通过println
确认了输出路径:
path=/git/block/target/scala-2.11/test-classes/data/output/blocking/out//level1/clusters
这是从保存的 parquet
:
读取 back 后的架构
为什么会对输入模式进行这两项修改 - 如何避免 - 同时仍将 centroid0
保留为分区列?
Update 首选答案应该提及为什么/何时将分区添加到 end(与 beginning ) 的列列表。我们需要了解确定性排序。
此外 - 有什么方法可以使推断的列类型从 spark
变为 "change it's mind" 吗?我不得不将分区从 0
、1
等更改为 c0
、c1
等,以便推断映射到 StringType
。也许这是必需的......但如果有一些火花设置来改变行为,那将是一个很好的答案。
其实原因很简单。按列分区时,每个分区只能包含该列的一个值。因此,实际上在文件中各处写入相同的值是没有用的,这就是 Spark 不这样做的原因。读取文件时,Spark 使用文件名中包含的信息重建分区列并将其放在模式的末尾。列的类型未存储,它是在读取时推断的,因此在您的情况下是整数类型。
注意:关于为什么在末尾添加该列没有特别的原因。它可能是在一开始。我想这只是一个任意的实现选择。
为避免丢失列的类型和顺序,您可以像这样复制分区列df.withColumn("X", 'YOUR_COLUMN).write.partitionBy("X").parquet("...")
。
虽然你会浪费 space。此外,例如,spark 使用分区来优化过滤器。不要忘记在读取数据后使用 X 列进行筛选,而不是您的列,否则 Spark 将无法执行任何优化。
当您 write.partitionBy(...)
Spark 将分区字段保存为文件夹时
这对于以后读取数据可能是有益的,因为(对于某些文件类型,包括镶木地板)它可以优化以仅从您使用的分区读取数据(即,如果您读取并过滤 centroid0==1 spark 将不会读取其他分区
这样做的效果是分区字段(在您的情况下为 centroid0
)不会仅作为文件夹名称(centroid0=1
、centroid0=2
等)写入 parquet 文件。 )
这些的副作用是 1. 分区的类型是在 运行 时推断出来的(因为模式没有保存在镶木地板中)并且在你的情况下碰巧你只有整数值所以它被推断为整数。
另一个副作用是分区字段被添加到模式的 end/beginning 中,因为它从镶木地板文件中读取模式作为一个块,然后将分区字段添加到其中作为另一个(同样,它不再是存储在镶木地板中的架构的一部分)
实际上,您可以很容易地利用包含分区数据架构的案例 class 的列的排序。您将需要从路径中读取数据,其中分区列存储在下面以使 Spark 推断这些列的值。然后通过使用 case class 模式和如下语句简单地应用 re-ordering:
val encoder: Encoder[RecordType] = Encoders.product[RecordType]
spark.read
.schema(encoder.schema)
.format("parquet")
.option("mergeSchema", "true")
.load(myPath)
// reorder columns, since reading from partitioned data, the partitioning columns are put to end
.select(encoder.schema.fieldNames.head, encoder.schema.fieldNames.tail: _*)
.as[RecordType]
对于给定的 DataFrame,就在 save
到 parquet
之前,这里是模式:注意 centroid0
是 第一个 列并且是 StringType
:
但是当保存文件时使用:
df.write.partitionBy(dfHolder.metadata.partitionCols: _*).format("parquet").mode("overwrite").save(fpath)
并且 partitionCols
为 centroid0
:
然后有一个(对我来说)令人惊讶的结果:
centroid0
分区列已移至行 的末端
- 数据类型已更改为
Integer
我通过println
确认了输出路径:
path=/git/block/target/scala-2.11/test-classes/data/output/blocking/out//level1/clusters
这是从保存的 parquet
:
为什么会对输入模式进行这两项修改 - 如何避免 - 同时仍将 centroid0
保留为分区列?
Update 首选答案应该提及为什么/何时将分区添加到 end(与 beginning ) 的列列表。我们需要了解确定性排序。
此外 - 有什么方法可以使推断的列类型从 spark
变为 "change it's mind" 吗?我不得不将分区从 0
、1
等更改为 c0
、c1
等,以便推断映射到 StringType
。也许这是必需的......但如果有一些火花设置来改变行为,那将是一个很好的答案。
其实原因很简单。按列分区时,每个分区只能包含该列的一个值。因此,实际上在文件中各处写入相同的值是没有用的,这就是 Spark 不这样做的原因。读取文件时,Spark 使用文件名中包含的信息重建分区列并将其放在模式的末尾。列的类型未存储,它是在读取时推断的,因此在您的情况下是整数类型。 注意:关于为什么在末尾添加该列没有特别的原因。它可能是在一开始。我想这只是一个任意的实现选择。
为避免丢失列的类型和顺序,您可以像这样复制分区列df.withColumn("X", 'YOUR_COLUMN).write.partitionBy("X").parquet("...")
。
虽然你会浪费 space。此外,例如,spark 使用分区来优化过滤器。不要忘记在读取数据后使用 X 列进行筛选,而不是您的列,否则 Spark 将无法执行任何优化。
当您 write.partitionBy(...)
Spark 将分区字段保存为文件夹时
这对于以后读取数据可能是有益的,因为(对于某些文件类型,包括镶木地板)它可以优化以仅从您使用的分区读取数据(即,如果您读取并过滤 centroid0==1 spark 将不会读取其他分区
这样做的效果是分区字段(在您的情况下为 centroid0
)不会仅作为文件夹名称(centroid0=1
、centroid0=2
等)写入 parquet 文件。 )
这些的副作用是 1. 分区的类型是在 运行 时推断出来的(因为模式没有保存在镶木地板中)并且在你的情况下碰巧你只有整数值所以它被推断为整数。
另一个副作用是分区字段被添加到模式的 end/beginning 中,因为它从镶木地板文件中读取模式作为一个块,然后将分区字段添加到其中作为另一个(同样,它不再是存储在镶木地板中的架构的一部分)
实际上,您可以很容易地利用包含分区数据架构的案例 class 的列的排序。您将需要从路径中读取数据,其中分区列存储在下面以使 Spark 推断这些列的值。然后通过使用 case class 模式和如下语句简单地应用 re-ordering:
val encoder: Encoder[RecordType] = Encoders.product[RecordType]
spark.read
.schema(encoder.schema)
.format("parquet")
.option("mergeSchema", "true")
.load(myPath)
// reorder columns, since reading from partitioned data, the partitioning columns are put to end
.select(encoder.schema.fieldNames.head, encoder.schema.fieldNames.tail: _*)
.as[RecordType]