PySpark:使用 binaryFiles() 函数读取二进制文件时进行分区

PySpark: Partitioning while reading a binary file using binaryFiles() function

sc = SparkContext("Local")
rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).partitionBy(8)

sc = SparkContext("Local")
rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).repartition(8)

使用上述任一代码,我试图 在我的 RDD 中创建 8 个分区{其中,我希望数据均匀分布在所有分区上}。当我打印 {rdd.getNumPartitions()} 时,显示的分区数仅为 8,但在 Spark UI,我观察到虽然做了8个分区,但是所有的二进制文件数据都只放在一个分区上。

注意:minPartition 属性不起作用。即使在设置 minPartitions = 5 之后,RDD 中的分区数也仅为 1。因此,使用了 partitionBy/repartition 函数。

这是期望的行为还是我遗漏了什么?

TL;DR 这是预期的行为。

自从您使用 binaryFiles 读取文件后,文件的全部内容作为单个记录加载,并且单个记录不能跨多个分区拆分。这里根本就没有什么可分发的。

Spark 2.4+,问题应该已解决,请参阅此答案下方@Rahul 的评论。

Spark 2.1-2.3binaryFiles()minPartitions 参数被忽略。参见 Spark-16575 and the commit changes to function setMinPartitions()。请注意,在提交中更改了函数中不再使用 minPartitions 的方式!

如果您使用 binaryFiles() 读取多个二进制文件,输入文件将根据以下内容合并到分区中:

  • spark.files.maxPartitionBytes,默认 128 MB
  • spark.files.openCostInBytes,默认 4 MB
  • spark.default.parallelism
  • 您输入的总大小

描述了前三个配置项here。查看上面的提交更改以查看实际计算。

我有一个场景,我希望每个输入分区最多 40 MB,因此每个任务 40 MB...以增加解析时的并行性。 (Spark 将 128 MB 放入每个分区,减慢了我的应用程序。)我在调用 binaryFiles():

之前将 spark.files.maxPartitionBytes 设置为 40 M
spark = SparkSession \
   .builder \
   .config("spark.files.maxPartitionBytes", 40*1024*1024)

对于只有一个输入文件,@user9864979 的回答是正确的:不能仅使用 binaryFiles() 将单个文件拆分为多个分区。


当使用 Spark 1.6 读取多个文件时,minPartitions 参数确实有效,您必须使用它。如果不这样做,您将遇到 Spark-16575 问题:您所有的输入文件将只读入两个分区!

您会发现 Spark 通常会为您提供比您请求的更少的输入分区。我有一个场景,我希望每两个输入二进制文件有一个输入分区。我发现将 minPartitions 设置为 "the # of input files * 7 / 10" 可以大致满足我的需求。
我有另一种情况,我希望每个输入文件都有一个输入分区。我发现将 minPartitions 设置为 "the # of input files * 2" 给了我想要的东西。

Spark 1.5 binaryFiles() 的行为:每个输入文件都有一个分区。