最简单程序的大任务
Large task size for simplest program
我正在尝试 运行 最简单的 Spark 程序
import org.apache.spark.{SparkContext, SparkConf}
object LargeTaskTest {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("DataTest").setMaster("local[*]")
val sc = new SparkContext(conf)
val dat = (1 to 10000000).toList
val data = sc.parallelize(dat).cache()
for(i <- 1 to 100){
println(data.reduce(_ + _))
}
}
}
每次迭代后我收到以下错误消息:
WARN TaskSetManager: Stage 0 contains a task of very large size (9767
KB). The maximum recommended task size is 100 KB.
增加数据大小会增加所述任务大小。这向我表明驱动程序正在将 "dat" 对象发送给所有执行程序,但我终究无法理解为什么,因为我的 RDD 上唯一的操作是 reduce,它基本上没有关闭。有任何想法吗 ?
Reduce 函数将所有数据发送到一个节点。当您 运行 sc.parallelize
时,数据默认分布到 100 个分区。要使用已经分发的数据,请使用如下内容:
data.map(el=> el%100 -> el).reduceByKey(_+_)
或者您可以在分区级别进行减少。
data.mapPartitions(p => Iterator(p.reduce(_ + _))).reduce(_ + _)
或者只使用 sum
:)
因为您首先在本地创建了一个非常大的列表,所以 Spark parallelize
方法试图将这个列表作为一个单独的单元发送给 Spark worker,作为任务的一部分。因此,您会收到警告消息。作为替代方案,您可以并行化一个更小的列表,然后使用 flatMap
将其分解为更大的列表。这也有利于并行创建更大的数字集。例如:
import org.apache.spark.{SparkContext, SparkConf}
object LargeTaskTest extends App {
val conf = new SparkConf().setAppName("DataTest").setMaster("local[*]")
val sc = new SparkContext(conf)
val dat = (0 to 99).toList
val data = sc.parallelize(dat).cache().flatMap(i => (1 to 1000000).map(j => j * 100 + i))
println(data.count()) //100000000
println(data.reduce(_ + _))
sc.stop()
}
编辑:
最终必须将并行化的本地集合推送到执行程序。 parallelize
方法创建了一个 ParallelCollectionRDD 的实例:
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
ParallelCollectionRDD 创建的分区数等于 numSlices
:
override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
numSlices
默认为 sc.defaultParallelism
,在我的机器上是 4。所以即使拆分时,每个分区都包含一个非常大的列表,需要将其推送给执行程序。
SparkContext.parallelize
包含注释 @note Parallelize acts lazily
并且 ParallelCollectionRDD
包含注释;
// TODO: Right now, each split sends along its full data, even if
later down the RDD chain it gets // cached. It might be worthwhile
to write the data to a file in the DFS and read it in the split //
instead.
所以问题似乎发生在您调用 reduce 时,因为这是将分区发送到执行程序的点,但根本原因是您在一个非常大的列表上调用 parallelize。在执行程序中生成大列表是更好的方法,恕我直言。
我正在尝试 运行 最简单的 Spark 程序
import org.apache.spark.{SparkContext, SparkConf}
object LargeTaskTest {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("DataTest").setMaster("local[*]")
val sc = new SparkContext(conf)
val dat = (1 to 10000000).toList
val data = sc.parallelize(dat).cache()
for(i <- 1 to 100){
println(data.reduce(_ + _))
}
}
}
每次迭代后我收到以下错误消息:
WARN TaskSetManager: Stage 0 contains a task of very large size (9767 KB). The maximum recommended task size is 100 KB.
增加数据大小会增加所述任务大小。这向我表明驱动程序正在将 "dat" 对象发送给所有执行程序,但我终究无法理解为什么,因为我的 RDD 上唯一的操作是 reduce,它基本上没有关闭。有任何想法吗 ?
Reduce 函数将所有数据发送到一个节点。当您 运行 sc.parallelize
时,数据默认分布到 100 个分区。要使用已经分发的数据,请使用如下内容:
data.map(el=> el%100 -> el).reduceByKey(_+_)
或者您可以在分区级别进行减少。
data.mapPartitions(p => Iterator(p.reduce(_ + _))).reduce(_ + _)
或者只使用 sum
:)
因为您首先在本地创建了一个非常大的列表,所以 Spark parallelize
方法试图将这个列表作为一个单独的单元发送给 Spark worker,作为任务的一部分。因此,您会收到警告消息。作为替代方案,您可以并行化一个更小的列表,然后使用 flatMap
将其分解为更大的列表。这也有利于并行创建更大的数字集。例如:
import org.apache.spark.{SparkContext, SparkConf}
object LargeTaskTest extends App {
val conf = new SparkConf().setAppName("DataTest").setMaster("local[*]")
val sc = new SparkContext(conf)
val dat = (0 to 99).toList
val data = sc.parallelize(dat).cache().flatMap(i => (1 to 1000000).map(j => j * 100 + i))
println(data.count()) //100000000
println(data.reduce(_ + _))
sc.stop()
}
编辑:
最终必须将并行化的本地集合推送到执行程序。 parallelize
方法创建了一个 ParallelCollectionRDD 的实例:
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
ParallelCollectionRDD 创建的分区数等于 numSlices
:
override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
numSlices
默认为 sc.defaultParallelism
,在我的机器上是 4。所以即使拆分时,每个分区都包含一个非常大的列表,需要将其推送给执行程序。
SparkContext.parallelize
包含注释 @note Parallelize acts lazily
并且 ParallelCollectionRDD
包含注释;
// TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets // cached. It might be worthwhile to write the data to a file in the DFS and read it in the split // instead.
所以问题似乎发生在您调用 reduce 时,因为这是将分区发送到执行程序的点,但根本原因是您在一个非常大的列表上调用 parallelize。在执行程序中生成大列表是更好的方法,恕我直言。