Apache Spark Shuffle 写入但没有 Shuffle 读取

Apache Spark Shuffle Writes but no Shuffle Reads

又是一个 spark shuffle 问题...

我在一个相当大的实例(160GB RAM,40 个内核)上设置了一个节点。我确实想训练 ~250 个具有不同参数的 ALS 模型(基于相同的数据帧)。

在调查磁盘问题 I/O 多日后,我发现了这个对话:http://apache-spark-developers-list.1001551.n3.nabble.com/Eliminating-shuffle-write-and-spill-disk-IO-reads-writes-in-Spark-td16955.html

我确实像他们说的那样并将我的 spark.local.dir 指向了 RAM 磁盘。我认为它完成了工作,现在所有内核都得到了很好的使用。好!

但是,我确实在执行程序选项卡中看到了一些我无法理解的内容:

貌似只有Shuffle写,没有读。简单的问题:为什么?有必要吗?如果没有:我怎样才能避免这种情况?

执行的主要块:

    val modelsAndResults =  parameters.par.map( e => {
        var auc = 0d 

        splits.zipWithIndex.foreach { case ((training, validation), splitIndex) => 
                val trainingDataset = spark.createDataFrame(training, schema).cache()
                val validationDataset = spark.createDataFrame(validation, schema).cache()

                val als = baseALS.copy(ParamMap(
                    baseALS.rank -> e._2,
                    baseALS.maxIter -> e._1,
                    baseALS.alpha -> e._4,
                    baseALS.regParam -> e._5,
                    baseALS.nonnegative -> e._6));

                //println(s"$e evaluating $splitIndex ...")
                val localAUC = new BinaryClassificationEvaluator().evaluate(
                    validationDataset
                        .join(als.fit(trainingDataset).transform(validationDataset.drop("time")), Seq("user","article"))
                        .withColumn("label", when($"time" >= e._2, 1d).otherwise(0d)).drop("time")
                        .withColumn("rawPrediction", $"prediction".cast(DoubleType))
                        .select("label","rawPrediction"))
                auc += localAUC

                trainingDataset.unpersist()
                validationDataset.unpersist()

                //println(s"$e evaluating $splitIndex ... localAUC: $localAUC")
            }

        val finalAUC = auc / splits.size

        val csv = CSVWriter.open("cf_gridsearch_results_12345.csv", append = true)
        csv.writeRow(List(e._1,e._2,e._3,e._4,e._5,e._6,finalAUC))
        csv.close()

        println((e._1,e._2,e._3,e._4,e._5,e._6,finalAUC))

        (e._1,e._2,e._3,e._4,e._5,e._6,finalAUC)
})

Shuffling是指多个Spark阶段之间的数据交换。当所有数据在传输之前从所有执行程序序列化时,随机写入出现在阶段的末尾。随机读取发生在从所有执行者收集数据的阶段开始时。为了使用 shuffle read/write 获得完整图片,您必须 运行 在集群模式下。在这种情况下,随机读写将被触发

编辑

在您的情况下,您 运行 在一个实例中与一个执行程序一起使用,这意味着不需要从其他执行程序中获取分区,因此不会出现随机读取。关于随机写入,它通过调用缓存在数据帧的保存操作期间出现。