如何让 Spark slave 在 Hadoop+Spark 集群中使用 HDFS 输入文件 'local'?

How to make Spark slaves use HDFS input files 'local' to them in a Hadoop+Spark cluster?

我有一个由 9 台计算机组成的集群,上面安装了 Apache Hadoop 2.7.2 和 Spark 2.0.0。每台计算机 运行 都有一个 HDFS 数据节点和 Spark 从站。其中一台计算机还 运行 一个 HDFS namenode 和 Spark master。

我已经在复制=2 的 HDFS 中上传了几 TB 的 gz 存档。事实证明,某些档案已损坏。我想找到他们。看起来 'gunzip -t ' 可以提供帮助。所以我试图找到一种方法来 运行 集群上的 Spark 应用程序,以便每个 Spark 执行程序测试存档 'local' (即,其中一个副本位于该执行程序 运行s) 尽可能长的时间。以下脚本 运行s 但有时 Spark 执行程序会处理 HDFS 中的 'remote' 个文件:

// Usage (after packaging a jar with mainClass set to 'com.qbeats.cortex.CommoncrawlArchivesTester' in spark.pom
// and placing this jar file into Spark's home directory):
//    ./bin/spark-submit --master spark://LV-WS10.lviv:7077 spark-cortex-fat.jar spark://LV-WS10.lviv:7077 hdfs://LV-WS10.lviv:9000/commoncrawl 9
// means testing for corruption the gz-archives in the directory hdfs://LV-WS10.lviv:9000/commoncrawl
// using a Spark cluster with the Spark master URL spark://LV-WS10.lviv:7077 and 9 Spark slaves

package com.qbeats.cortex

import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.FileSplit
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.{SparkContext, SparkConf, AccumulatorParam}
import sys.process._

object CommoncrawlArchivesTester extends App {
  object LogAccumulator extends AccumulatorParam[String] {
    def zero(initialValue: String): String = ""
    def addInPlace(log1: String, log2: String) = if (log1.isEmpty) log2 else log1 + "\n" + log2
  }

  override def main(args: Array[String]): Unit = {
    if (args.length >= 3) {
      val appName = "CommoncrawlArchivesTester"
      val conf = new SparkConf().setAppName(appName).setMaster(args(0))
      conf.set("spark.executor.memory", "6g")
      conf.set("spark.shuffle.service.enabled", "true")
      conf.set("spark.dynamicAllocation.enabled", "true")
      conf.set("spark.dynamicAllocation.initialExecutors", args(2))
      val sc = new SparkContext(conf)

      val log = sc.accumulator(LogAccumulator.zero(""))(LogAccumulator)

      val text = sc.hadoopFile(args(1), classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
      val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]]

      val fileAndLine = hadoopRdd.mapPartitionsWithInputSplit { (inputSplit, iterator) =>
        val fileName = inputSplit.asInstanceOf[FileSplit].getPath.toString
        class FilePath extends Iterable[String] {
          def iterator = List(fileName).iterator
        }
        val result = (sys.env("HADOOP_PREFIX") + "/bin/hadoop fs -cat " + fileName) #| "gunzip -t" !

        println("Processed %s.".format(fileName))
        if (result != 0) {
          log.add(fileName)
          println("Corrupt: %s.".format(fileName))
        }
        (new FilePath).iterator
      }

      val result = fileAndLine.collect()

      println("Corrupted files:")
      println(log.value)
    }
  }
}

你有什么建议?


稍后添加:

我尝试了另一个脚本,它通过 textFile() 从 HDFS 获取文件。我看起来像 Spark 执行程序在输入文件中不喜欢 'local' 的文件。和"Spark brings code to data, not data to code"不矛盾吗?

// Usage (after packaging a jar with mainClass set to 'com.qbeats.cortex.CommoncrawlArchiveLinesCounter' in spark.pom)
//   ./bin/spark-submit --master spark://LV-WS10.lviv:7077 spark-cortex-fat.jar spark://LV-WS10.lviv:7077 hdfs://LV-WS10.lviv:9000/commoncrawl 9

package com.qbeats.cortex

import org.apache.spark.{SparkContext, SparkConf}

object CommoncrawlArchiveLinesCounter extends App {
  override def main(args: Array[String]): Unit = {
    if  (args.length >= 3) {
      val appName = "CommoncrawlArchiveLinesCounter"
      val conf = new SparkConf().setAppName(appName).setMaster(args(0))
      conf.set("spark.executor.memory", "6g")
      conf.set("spark.shuffle.service.enabled", "true")
      conf.set("spark.dynamicAllocation.enabled", "true")
      conf.set("spark.dynamicAllocation.initialExecutors", args(2))
      val sc = new SparkContext(conf)

      val helper = new Helper
      val nLines = sc.
        textFile(args(1) + "/*").
        mapPartitionsWithIndex( (index, it) => {
          println("Processing partition %s".format(index))
          it
        }).
        count
      println(nLines)
    }
  }
}

SAIF C,能详细解释一下吗?

我已经通过从 Spark 的独立模式切换到 YARN 来解决问题。

相关主题: