DataFrame partitionBy 到单个 Parquet 文件(每个分区)
DataFrame partitionBy to a single Parquet file (per partition)
我想重新分区/合并我的数据,以便每个分区将其保存到一个 Parquet 文件中。我还想使用 Spark SQL partitionBy API。所以我可以这样做:
df.coalesce(1)
.write
.partitionBy("entity", "year", "month", "day", "status")
.mode(SaveMode.Append)
.parquet(s"$location")
我已经对此进行了测试,但它似乎表现不佳。这是因为数据集中只有一个分区可以处理,所有文件的分区、压缩和保存都必须由一个 CPU 核心完成。
我可以重写它以在调用 coalesce 之前手动进行分区(例如使用具有不同分区值的过滤器)。
但是有没有更好的方法使用标准 Spark SQL API?
根据定义:
coalesce(numPartitions: Int): DataFrame
Returns a new DataFrame that has exactly numPartitions partitions.
您可以使用它通过 numPartitions 参数减少 RDD/DataFrame 中的分区数。在筛选大型数据集后,它对于 运行 操作更有效很有用。
关于您的代码,它执行不佳,因为您实际做的是:
将所有内容放入 1 个分区会使驱动程序过载,因为它将所有数据都放入驱动程序的 1 个分区中(这也不是一个好的做法)
coalesce
实际上会随机播放网络上的所有数据,这也可能导致性能损失。
The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.
shuffle 概念对于管理和理解非常重要。最好尽可能少地洗牌,因为这是一项昂贵的操作,因为它涉及磁盘 I/O、数据序列化和网络 I/O。为了为 shuffle 组织数据,Spark 生成了一组任务——map 任务来组织数据,以及一组 reduce 任务来聚合它。此命名法来自 MapReduce,与 Spark 的 map 和 reduce 操作没有直接关系。
在内部,单个地图任务的结果会保存在内存中,直到它们无法容纳为止。然后,这些根据目标分区进行排序并写入单个文件。在 reduce 端,任务读取相关的排序块。
关于分区parquet,我建议你阅读Spark Programming Guide for Performance Tuning.
中的答案 about Spark DataFrames with Parquet Partitioning and also this section
希望对您有所帮助!
我遇到了完全相同的问题,我找到了使用 DataFrame.repartition()
解决此问题的方法。使用 coalesce(1)
的问题是你的并行度下降到 1,最好的情况下它可能很慢,最坏的情况下会出错。增加这个数字也无济于事——如果你这样做 coalesce(10)
你会得到更多的并行度,但最终每个分区有 10 个文件。
要在不使用 coalesce()
的情况下为每个分区获取一个文件,请将 repartition()
与您希望对输出进行分区的相同列一起使用。所以在你的情况下,这样做:
import spark.implicits._
df.repartition($"entity", $"year", $"month", $"day", $"status").write.partitionBy("entity", "year", "month", "day", "status").mode(SaveMode.Append).parquet(s"$location")
一旦我这样做了,我就会为每个输出分区得到一个 parquet 文件,而不是多个文件。
我在 Python 中对此进行了测试,但我认为在 Scala 中它应该是相同的。
它与@mortada 的解决方案并没有太大关系,但这里有一些抽象,可确保您使用相同的分区来重新分区和写入,并演示排序:
def one_file_per_partition(df, path, partitions, sort_within_partitions, VERBOSE = False):
start = datetime.now()
(df.repartition(*partitions)
.sortWithinPartitions(*sort_within_partitions)
.write.partitionBy(*partitions)
# TODO: Format of your choosing here
.mode(SaveMode.Append).parquet(path)
# or, e.g.:
#.option("compression", "gzip").option("header", "true").mode("overwrite").csv(path)
)
print(f"Wrote data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
f"\n {path}\n Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")
用法:
one_file_per_partition(df, location, ["entity", "year", "month", "day", "status"])
我想重新分区/合并我的数据,以便每个分区将其保存到一个 Parquet 文件中。我还想使用 Spark SQL partitionBy API。所以我可以这样做:
df.coalesce(1)
.write
.partitionBy("entity", "year", "month", "day", "status")
.mode(SaveMode.Append)
.parquet(s"$location")
我已经对此进行了测试,但它似乎表现不佳。这是因为数据集中只有一个分区可以处理,所有文件的分区、压缩和保存都必须由一个 CPU 核心完成。
我可以重写它以在调用 coalesce 之前手动进行分区(例如使用具有不同分区值的过滤器)。
但是有没有更好的方法使用标准 Spark SQL API?
根据定义:
coalesce(numPartitions: Int): DataFrame Returns a new DataFrame that has exactly numPartitions partitions.
您可以使用它通过 numPartitions 参数减少 RDD/DataFrame 中的分区数。在筛选大型数据集后,它对于 运行 操作更有效很有用。
关于您的代码,它执行不佳,因为您实际做的是:
将所有内容放入 1 个分区会使驱动程序过载,因为它将所有数据都放入驱动程序的 1 个分区中(这也不是一个好的做法)
coalesce
实际上会随机播放网络上的所有数据,这也可能导致性能损失。
The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.
shuffle 概念对于管理和理解非常重要。最好尽可能少地洗牌,因为这是一项昂贵的操作,因为它涉及磁盘 I/O、数据序列化和网络 I/O。为了为 shuffle 组织数据,Spark 生成了一组任务——map 任务来组织数据,以及一组 reduce 任务来聚合它。此命名法来自 MapReduce,与 Spark 的 map 和 reduce 操作没有直接关系。
在内部,单个地图任务的结果会保存在内存中,直到它们无法容纳为止。然后,这些根据目标分区进行排序并写入单个文件。在 reduce 端,任务读取相关的排序块。
关于分区parquet,我建议你阅读Spark Programming Guide for Performance Tuning.
中的答案希望对您有所帮助!
我遇到了完全相同的问题,我找到了使用 DataFrame.repartition()
解决此问题的方法。使用 coalesce(1)
的问题是你的并行度下降到 1,最好的情况下它可能很慢,最坏的情况下会出错。增加这个数字也无济于事——如果你这样做 coalesce(10)
你会得到更多的并行度,但最终每个分区有 10 个文件。
要在不使用 coalesce()
的情况下为每个分区获取一个文件,请将 repartition()
与您希望对输出进行分区的相同列一起使用。所以在你的情况下,这样做:
import spark.implicits._
df.repartition($"entity", $"year", $"month", $"day", $"status").write.partitionBy("entity", "year", "month", "day", "status").mode(SaveMode.Append).parquet(s"$location")
一旦我这样做了,我就会为每个输出分区得到一个 parquet 文件,而不是多个文件。
我在 Python 中对此进行了测试,但我认为在 Scala 中它应该是相同的。
它与@mortada 的解决方案并没有太大关系,但这里有一些抽象,可确保您使用相同的分区来重新分区和写入,并演示排序:
def one_file_per_partition(df, path, partitions, sort_within_partitions, VERBOSE = False):
start = datetime.now()
(df.repartition(*partitions)
.sortWithinPartitions(*sort_within_partitions)
.write.partitionBy(*partitions)
# TODO: Format of your choosing here
.mode(SaveMode.Append).parquet(path)
# or, e.g.:
#.option("compression", "gzip").option("header", "true").mode("overwrite").csv(path)
)
print(f"Wrote data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
f"\n {path}\n Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")
用法:
one_file_per_partition(df, location, ["entity", "year", "month", "day", "status"])