Spark 创建的分区比 WholeTextFiles 上的 minPartitions 参数少

Spark Creates Less Partitions Then minPartitions Argument on WholeTextFiles

我有一个文件夹,里面有 14 个文件。我 运行 集群上有 10 个执行程序的 spark-submit,它的资源管理器是 yarn。

我这样创建我的第一个 RDD:

JavaPairRDD<String,String> files = sc.wholeTextFiles(folderPath.toString(), 10);

但是,files.getNumPartitions()随机给我 7 或 8 个。然后我不在任何地方使用 coalesce/repartition,我用 7-8 个分区完成我的 DAG。

据我所知,我们给出的参数是 "minimum" 个分区数,所以为什么 Spark 将我的 RDD 划分为 7-8 个分区?

我也 运行 相同的程序有 20 个分区,它给了我 11 个分区。

我在这里看到了一个主题,但它是关于 "more" 个分区的,这对我一点帮助都没有。

注意:在程序中,我读取了另外一个文件夹,里面有10个文件,Spark成功创建了10个分区。我运行在这个成功的工作完成后进行上述有问题的转换。

文件大小: 1)25.07 KB 2)46.61 KB 3)126.34 KB 4)158.15 KB 5)169.21 KB 6)16.03 KB 7)67.41 KB 8)60.84 KB 9)70.83 KB 10)87.94 KB 11)99.29 KB 12)120.58 KB 13)170.43 KB 14)183.87 KB

文件在 HDFS 上,块大小为 128MB,复制因子 3。

It would have been more clear if we have size of each file. But code will not be wrong. I am adding this answer as per spark code base

  • 首先,maxSplitSize的计算取决于目录大小min partitions 传入 wholeTextFiles

        def setMinPartitions(context: JobContext, minPartitions: Int) {
          val files = listStatus(context).asScala
          val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
          val maxSplitSize = Math.ceil(totalLen * 1.0 /
            (if (minPartitions == 0) 1 else minPartitions)).toLong
          super.setMaxSplitSize(maxSplitSize)
        }
        // file: WholeTextFileInputFormat.scala
    

    link

  • 根据maxSplitSize 拆分(Spark 中的分区)将从源中提取。

        inputFormat.setMinPartitions(jobContext, minPartitions)
        val rawSplits = inputFormat.getSplits(jobContext).toArray // Here number of splits will be decides
        val result = new Array[Partition](rawSplits.size)
        for (i <- 0 until rawSplits.size) {
          result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
        }
        // file: WholeTextFileRDD.scala
    

    link

有关读取文件和准备拆分的更多信息,请访问 CombineFileInputFormat#getSplits class。

Note:

I referred Spark partitions as MapReduce splits here, as Spark borrowed input and output formatters from MapReduce