Spark 创建的分区比 WholeTextFiles 上的 minPartitions 参数少
Spark Creates Less Partitions Then minPartitions Argument on WholeTextFiles
我有一个文件夹,里面有 14 个文件。我 运行 集群上有 10 个执行程序的 spark-submit,它的资源管理器是 yarn。
我这样创建我的第一个 RDD:
JavaPairRDD<String,String> files = sc.wholeTextFiles(folderPath.toString(), 10);
但是,files.getNumPartitions()
随机给我 7 或 8 个。然后我不在任何地方使用 coalesce/repartition,我用 7-8 个分区完成我的 DAG。
据我所知,我们给出的参数是 "minimum" 个分区数,所以为什么 Spark 将我的 RDD 划分为 7-8 个分区?
我也 运行 相同的程序有 20 个分区,它给了我 11 个分区。
我在这里看到了一个主题,但它是关于 "more" 个分区的,这对我一点帮助都没有。
注意:在程序中,我读取了另外一个文件夹,里面有10个文件,Spark成功创建了10个分区。我运行在这个成功的工作完成后进行上述有问题的转换。
文件大小:
1)25.07 KB
2)46.61 KB
3)126.34 KB
4)158.15 KB
5)169.21 KB
6)16.03 KB
7)67.41 KB
8)60.84 KB
9)70.83 KB
10)87.94 KB
11)99.29 KB
12)120.58 KB
13)170.43 KB
14)183.87 KB
文件在 HDFS 上,块大小为 128MB,复制因子 3。
It would have been more clear if we have size of each file. But code will not be wrong. I am adding this answer as per spark code base
首先,maxSplitSize的计算取决于目录大小 和 min partitions 传入 wholeTextFiles
def setMinPartitions(context: JobContext, minPartitions: Int) {
val files = listStatus(context).asScala
val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
val maxSplitSize = Math.ceil(totalLen * 1.0 /
(if (minPartitions == 0) 1 else minPartitions)).toLong
super.setMaxSplitSize(maxSplitSize)
}
// file: WholeTextFileInputFormat.scala
根据maxSplitSize
拆分(Spark 中的分区)将从源中提取。
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray // Here number of splits will be decides
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
// file: WholeTextFileRDD.scala
有关读取文件和准备拆分的更多信息,请访问 CombineFileInputFormat#getSplits
class。
Note:
I referred Spark partitions as MapReduce splits here, as Spark
borrowed input and output formatters from MapReduce
我有一个文件夹,里面有 14 个文件。我 运行 集群上有 10 个执行程序的 spark-submit,它的资源管理器是 yarn。
我这样创建我的第一个 RDD:
JavaPairRDD<String,String> files = sc.wholeTextFiles(folderPath.toString(), 10);
但是,files.getNumPartitions()
随机给我 7 或 8 个。然后我不在任何地方使用 coalesce/repartition,我用 7-8 个分区完成我的 DAG。
据我所知,我们给出的参数是 "minimum" 个分区数,所以为什么 Spark 将我的 RDD 划分为 7-8 个分区?
我也 运行 相同的程序有 20 个分区,它给了我 11 个分区。
我在这里看到了一个主题,但它是关于 "more" 个分区的,这对我一点帮助都没有。
注意:在程序中,我读取了另外一个文件夹,里面有10个文件,Spark成功创建了10个分区。我运行在这个成功的工作完成后进行上述有问题的转换。
文件大小: 1)25.07 KB 2)46.61 KB 3)126.34 KB 4)158.15 KB 5)169.21 KB 6)16.03 KB 7)67.41 KB 8)60.84 KB 9)70.83 KB 10)87.94 KB 11)99.29 KB 12)120.58 KB 13)170.43 KB 14)183.87 KB
文件在 HDFS 上,块大小为 128MB,复制因子 3。
It would have been more clear if we have size of each file. But code will not be wrong. I am adding this answer as per spark code base
首先,maxSplitSize的计算取决于目录大小 和 min partitions 传入
wholeTextFiles
def setMinPartitions(context: JobContext, minPartitions: Int) { val files = listStatus(context).asScala val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minPartitions == 0) 1 else minPartitions)).toLong super.setMaxSplitSize(maxSplitSize) } // file: WholeTextFileInputFormat.scala
根据
maxSplitSize
拆分(Spark 中的分区)将从源中提取。inputFormat.setMinPartitions(jobContext, minPartitions) val rawSplits = inputFormat.getSplits(jobContext).toArray // Here number of splits will be decides val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) { result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) } // file: WholeTextFileRDD.scala
有关读取文件和准备拆分的更多信息,请访问 CombineFileInputFormat#getSplits
class。
Note:
I referred Spark partitions as MapReduce splits here, as Spark borrowed input and output formatters from MapReduce