为什么SparkContext.textFile分区参数不生效?

Why does partition parameter of SparkContext.textFile not take effect?

scala> val p=sc.textFile("file:///c:/_home/so-posts.xml", 8) //i've 8 cores
p: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[56] at textFile at <console>:21

scala> p.partitions.size
res33: Int = 729

我原以为会打印 8 个,但我在 Spark 中看到了 729 个任务 UI

编辑:

按照@zero323

的建议调用repartition()之后
scala> p1 = p.repartition(8)
scala> p1.partitions.size
res60: Int = 8
scala> p1.count

我仍然在 Spark UI 中看到 729 个任务,即使 spark-shell 打印 8.

如果你看一下签名

textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] 

您会看到您使用的参数称为 minPartitions,这几乎描述了它的功能。在某些情况下,甚至会忽略这一点,但这是另一回事。幕后使用的输入格式仍然决定如何计算拆分。

在这种特殊情况下,您可能会使用 mapred.min.split.size 来增加拆分大小(这将在加载期间起作用)或只是在加载后使用 repartition(这将在加载数据后生效)但在一般应该没有这个必要。

@zero323 搞定了,但我想我会添加更多(低级)背景知识,说明这个 minPartitions 输入参数如何影响分区数。

tl;dr 分区参数确实对 SparkContext.textFile 有影响,因为 最小值 (不准确!)分区数。

在这种使用 SparkContext.textFile, the number of partitions are calculated directly by org.apache.hadoop.mapred.TextInputFormat.getSplits(jobConf, minPartitions) 的特殊情况下,textFile 也使用了它。 TextInputFormat 只有 知道如何使用 Spark 分区(又名 拆分 )分布式数据只遵循建议。

来自 Hadoop 的 FileInputFormat's javadoc:

FileInputFormat is the base class for all file-based InputFormats. This provides a generic implementation of getSplits(JobConf, int). Subclasses of FileInputFormat can also override the isSplitable(FileSystem, Path) method to ensure input-files are not split-up and are processed as a whole by Mappers.

这是 Spark 如何利用 Hadoop 的一个很好的例子 API。

顺便说一句,您可能会发现 the sources 很有启发性 ;-)