Apache Spark Shuffle 写入但没有 Shuffle 读取
Apache Spark Shuffle Writes but no Shuffle Reads
又是一个 spark shuffle 问题...
我在一个相当大的实例(160GB RAM,40 个内核)上设置了一个节点。我确实想训练 ~250 个具有不同参数的 ALS 模型(基于相同的数据帧)。
在调查磁盘问题 I/O 多日后,我发现了这个对话:http://apache-spark-developers-list.1001551.n3.nabble.com/Eliminating-shuffle-write-and-spill-disk-IO-reads-writes-in-Spark-td16955.html
我确实像他们说的那样并将我的 spark.local.dir
指向了 RAM 磁盘。我认为它完成了工作,现在所有内核都得到了很好的使用。好!
但是,我确实在执行程序选项卡中看到了一些我无法理解的内容:
貌似只有Shuffle写,没有读。简单的问题:为什么?有必要吗?如果没有:我怎样才能避免这种情况?
执行的主要块:
val modelsAndResults = parameters.par.map( e => {
var auc = 0d
splits.zipWithIndex.foreach { case ((training, validation), splitIndex) =>
val trainingDataset = spark.createDataFrame(training, schema).cache()
val validationDataset = spark.createDataFrame(validation, schema).cache()
val als = baseALS.copy(ParamMap(
baseALS.rank -> e._2,
baseALS.maxIter -> e._1,
baseALS.alpha -> e._4,
baseALS.regParam -> e._5,
baseALS.nonnegative -> e._6));
//println(s"$e evaluating $splitIndex ...")
val localAUC = new BinaryClassificationEvaluator().evaluate(
validationDataset
.join(als.fit(trainingDataset).transform(validationDataset.drop("time")), Seq("user","article"))
.withColumn("label", when($"time" >= e._2, 1d).otherwise(0d)).drop("time")
.withColumn("rawPrediction", $"prediction".cast(DoubleType))
.select("label","rawPrediction"))
auc += localAUC
trainingDataset.unpersist()
validationDataset.unpersist()
//println(s"$e evaluating $splitIndex ... localAUC: $localAUC")
}
val finalAUC = auc / splits.size
val csv = CSVWriter.open("cf_gridsearch_results_12345.csv", append = true)
csv.writeRow(List(e._1,e._2,e._3,e._4,e._5,e._6,finalAUC))
csv.close()
println((e._1,e._2,e._3,e._4,e._5,e._6,finalAUC))
(e._1,e._2,e._3,e._4,e._5,e._6,finalAUC)
})
Shuffling是指多个Spark阶段之间的数据交换。当所有数据在传输之前从所有执行程序序列化时,随机写入出现在阶段的末尾。随机读取发生在从所有执行者收集数据的阶段开始时。为了使用 shuffle read/write 获得完整图片,您必须 运行 在集群模式下。在这种情况下,随机读写将被触发
编辑
在您的情况下,您 运行 在一个实例中与一个执行程序一起使用,这意味着不需要从其他执行程序中获取分区,因此不会出现随机读取。关于随机写入,它通过调用缓存在数据帧的保存操作期间出现。
又是一个 spark shuffle 问题...
我在一个相当大的实例(160GB RAM,40 个内核)上设置了一个节点。我确实想训练 ~250 个具有不同参数的 ALS 模型(基于相同的数据帧)。
在调查磁盘问题 I/O 多日后,我发现了这个对话:http://apache-spark-developers-list.1001551.n3.nabble.com/Eliminating-shuffle-write-and-spill-disk-IO-reads-writes-in-Spark-td16955.html
我确实像他们说的那样并将我的 spark.local.dir
指向了 RAM 磁盘。我认为它完成了工作,现在所有内核都得到了很好的使用。好!
但是,我确实在执行程序选项卡中看到了一些我无法理解的内容:
貌似只有Shuffle写,没有读。简单的问题:为什么?有必要吗?如果没有:我怎样才能避免这种情况?
执行的主要块:
val modelsAndResults = parameters.par.map( e => {
var auc = 0d
splits.zipWithIndex.foreach { case ((training, validation), splitIndex) =>
val trainingDataset = spark.createDataFrame(training, schema).cache()
val validationDataset = spark.createDataFrame(validation, schema).cache()
val als = baseALS.copy(ParamMap(
baseALS.rank -> e._2,
baseALS.maxIter -> e._1,
baseALS.alpha -> e._4,
baseALS.regParam -> e._5,
baseALS.nonnegative -> e._6));
//println(s"$e evaluating $splitIndex ...")
val localAUC = new BinaryClassificationEvaluator().evaluate(
validationDataset
.join(als.fit(trainingDataset).transform(validationDataset.drop("time")), Seq("user","article"))
.withColumn("label", when($"time" >= e._2, 1d).otherwise(0d)).drop("time")
.withColumn("rawPrediction", $"prediction".cast(DoubleType))
.select("label","rawPrediction"))
auc += localAUC
trainingDataset.unpersist()
validationDataset.unpersist()
//println(s"$e evaluating $splitIndex ... localAUC: $localAUC")
}
val finalAUC = auc / splits.size
val csv = CSVWriter.open("cf_gridsearch_results_12345.csv", append = true)
csv.writeRow(List(e._1,e._2,e._3,e._4,e._5,e._6,finalAUC))
csv.close()
println((e._1,e._2,e._3,e._4,e._5,e._6,finalAUC))
(e._1,e._2,e._3,e._4,e._5,e._6,finalAUC)
})
Shuffling是指多个Spark阶段之间的数据交换。当所有数据在传输之前从所有执行程序序列化时,随机写入出现在阶段的末尾。随机读取发生在从所有执行者收集数据的阶段开始时。为了使用 shuffle read/write 获得完整图片,您必须 运行 在集群模式下。在这种情况下,随机读写将被触发
编辑
在您的情况下,您 运行 在一个实例中与一个执行程序一起使用,这意味着不需要从其他执行程序中获取分区,因此不会出现随机读取。关于随机写入,它通过调用缓存在数据帧的保存操作期间出现。