PySpark:使用 binaryFiles() 函数读取二进制文件时进行分区
PySpark: Partitioning while reading a binary file using binaryFiles() function
sc = SparkContext("Local")
rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).partitionBy(8)
或
sc = SparkContext("Local")
rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).repartition(8)
使用上述任一代码,我试图 在我的 RDD 中创建 8 个分区{其中,我希望数据均匀分布在所有分区上}。当我打印 {rdd.getNumPartitions()} 时,显示的分区数仅为 8,但在 Spark UI,我观察到虽然做了8个分区,但是所有的二进制文件数据都只放在一个分区上。
注意:minPartition 属性不起作用。即使在设置 minPartitions = 5 之后,RDD 中的分区数也仅为 1。因此,使用了 partitionBy/repartition 函数。
这是期望的行为还是我遗漏了什么?
TL;DR 这是预期的行为。
自从您使用 binaryFiles
读取文件后,文件的全部内容作为单个记录加载,并且单个记录不能跨多个分区拆分。这里根本就没有什么可分发的。
Spark 2.4+,问题应该已解决,请参阅此答案下方@Rahul 的评论。
Spark 2.1-2.3,binaryFiles()
的 minPartitions
参数被忽略。参见 Spark-16575 and the commit changes to function setMinPartitions()。请注意,在提交中更改了函数中不再使用 minPartitions
的方式!
如果您使用 binaryFiles()
读取多个二进制文件,输入文件将根据以下内容合并到分区中:
spark.files.maxPartitionBytes
,默认 128 MB
spark.files.openCostInBytes
,默认 4 MB
spark.default.parallelism
- 您输入的总大小
描述了前三个配置项here。查看上面的提交更改以查看实际计算。
我有一个场景,我希望每个输入分区最多 40 MB,因此每个任务 40 MB...以增加解析时的并行性。 (Spark 将 128 MB 放入每个分区,减慢了我的应用程序。)我在调用 binaryFiles()
:
之前将 spark.files.maxPartitionBytes
设置为 40 M
spark = SparkSession \
.builder \
.config("spark.files.maxPartitionBytes", 40*1024*1024)
对于只有一个输入文件,@user9864979 的回答是正确的:不能仅使用 binaryFiles()
将单个文件拆分为多个分区。
当使用 Spark 1.6 读取多个文件时,minPartitions
参数确实有效,您必须使用它。如果不这样做,您将遇到 Spark-16575 问题:您所有的输入文件将只读入两个分区!
您会发现 Spark 通常会为您提供比您请求的更少的输入分区。我有一个场景,我希望每两个输入二进制文件有一个输入分区。我发现将 minPartitions
设置为 "the # of input files * 7 / 10" 可以大致满足我的需求。
我有另一种情况,我希望每个输入文件都有一个输入分区。我发现将 minPartitions
设置为 "the # of input files * 2" 给了我想要的东西。
Spark 1.5 binaryFiles()
的行为:每个输入文件都有一个分区。
sc = SparkContext("Local")
rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).partitionBy(8)
或
sc = SparkContext("Local")
rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).repartition(8)
使用上述任一代码,我试图 在我的 RDD 中创建 8 个分区{其中,我希望数据均匀分布在所有分区上}。当我打印 {rdd.getNumPartitions()} 时,显示的分区数仅为 8,但在 Spark UI,我观察到虽然做了8个分区,但是所有的二进制文件数据都只放在一个分区上。
注意:minPartition 属性不起作用。即使在设置 minPartitions = 5 之后,RDD 中的分区数也仅为 1。因此,使用了 partitionBy/repartition 函数。
这是期望的行为还是我遗漏了什么?
TL;DR 这是预期的行为。
自从您使用 binaryFiles
读取文件后,文件的全部内容作为单个记录加载,并且单个记录不能跨多个分区拆分。这里根本就没有什么可分发的。
Spark 2.4+,问题应该已解决,请参阅此答案下方@Rahul 的评论。
Spark 2.1-2.3,binaryFiles()
的 minPartitions
参数被忽略。参见 Spark-16575 and the commit changes to function setMinPartitions()。请注意,在提交中更改了函数中不再使用 minPartitions
的方式!
如果您使用 binaryFiles()
读取多个二进制文件,输入文件将根据以下内容合并到分区中:
spark.files.maxPartitionBytes
,默认 128 MBspark.files.openCostInBytes
,默认 4 MBspark.default.parallelism
- 您输入的总大小
描述了前三个配置项here。查看上面的提交更改以查看实际计算。
我有一个场景,我希望每个输入分区最多 40 MB,因此每个任务 40 MB...以增加解析时的并行性。 (Spark 将 128 MB 放入每个分区,减慢了我的应用程序。)我在调用 binaryFiles()
:
spark.files.maxPartitionBytes
设置为 40 M
spark = SparkSession \
.builder \
.config("spark.files.maxPartitionBytes", 40*1024*1024)
对于只有一个输入文件,@user9864979 的回答是正确的:不能仅使用 binaryFiles()
将单个文件拆分为多个分区。
当使用 Spark 1.6 读取多个文件时,minPartitions
参数确实有效,您必须使用它。如果不这样做,您将遇到 Spark-16575 问题:您所有的输入文件将只读入两个分区!
您会发现 Spark 通常会为您提供比您请求的更少的输入分区。我有一个场景,我希望每两个输入二进制文件有一个输入分区。我发现将 minPartitions
设置为 "the # of input files * 7 / 10" 可以大致满足我的需求。
我有另一种情况,我希望每个输入文件都有一个输入分区。我发现将 minPartitions
设置为 "the # of input files * 2" 给了我想要的东西。
Spark 1.5 binaryFiles()
的行为:每个输入文件都有一个分区。