如何让 Spark slave 在 Hadoop+Spark 集群中使用 HDFS 输入文件 'local'?
How to make Spark slaves use HDFS input files 'local' to them in a Hadoop+Spark cluster?
我有一个由 9 台计算机组成的集群,上面安装了 Apache Hadoop 2.7.2 和 Spark 2.0.0。每台计算机 运行 都有一个 HDFS 数据节点和 Spark 从站。其中一台计算机还 运行 一个 HDFS namenode 和 Spark master。
我已经在复制=2 的 HDFS 中上传了几 TB 的 gz 存档。事实证明,某些档案已损坏。我想找到他们。看起来 'gunzip -t ' 可以提供帮助。所以我试图找到一种方法来 运行 集群上的 Spark 应用程序,以便每个 Spark 执行程序测试存档 'local' (即,其中一个副本位于该执行程序 运行s) 尽可能长的时间。以下脚本 运行s 但有时 Spark 执行程序会处理 HDFS 中的 'remote' 个文件:
// Usage (after packaging a jar with mainClass set to 'com.qbeats.cortex.CommoncrawlArchivesTester' in spark.pom
// and placing this jar file into Spark's home directory):
// ./bin/spark-submit --master spark://LV-WS10.lviv:7077 spark-cortex-fat.jar spark://LV-WS10.lviv:7077 hdfs://LV-WS10.lviv:9000/commoncrawl 9
// means testing for corruption the gz-archives in the directory hdfs://LV-WS10.lviv:9000/commoncrawl
// using a Spark cluster with the Spark master URL spark://LV-WS10.lviv:7077 and 9 Spark slaves
package com.qbeats.cortex
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.FileSplit
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.{SparkContext, SparkConf, AccumulatorParam}
import sys.process._
object CommoncrawlArchivesTester extends App {
object LogAccumulator extends AccumulatorParam[String] {
def zero(initialValue: String): String = ""
def addInPlace(log1: String, log2: String) = if (log1.isEmpty) log2 else log1 + "\n" + log2
}
override def main(args: Array[String]): Unit = {
if (args.length >= 3) {
val appName = "CommoncrawlArchivesTester"
val conf = new SparkConf().setAppName(appName).setMaster(args(0))
conf.set("spark.executor.memory", "6g")
conf.set("spark.shuffle.service.enabled", "true")
conf.set("spark.dynamicAllocation.enabled", "true")
conf.set("spark.dynamicAllocation.initialExecutors", args(2))
val sc = new SparkContext(conf)
val log = sc.accumulator(LogAccumulator.zero(""))(LogAccumulator)
val text = sc.hadoopFile(args(1), classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]]
val fileAndLine = hadoopRdd.mapPartitionsWithInputSplit { (inputSplit, iterator) =>
val fileName = inputSplit.asInstanceOf[FileSplit].getPath.toString
class FilePath extends Iterable[String] {
def iterator = List(fileName).iterator
}
val result = (sys.env("HADOOP_PREFIX") + "/bin/hadoop fs -cat " + fileName) #| "gunzip -t" !
println("Processed %s.".format(fileName))
if (result != 0) {
log.add(fileName)
println("Corrupt: %s.".format(fileName))
}
(new FilePath).iterator
}
val result = fileAndLine.collect()
println("Corrupted files:")
println(log.value)
}
}
}
你有什么建议?
稍后添加:
我尝试了另一个脚本,它通过 textFile() 从 HDFS 获取文件。我看起来像 Spark 执行程序在输入文件中不喜欢 'local' 的文件。和"Spark brings code to data, not data to code"不矛盾吗?
// Usage (after packaging a jar with mainClass set to 'com.qbeats.cortex.CommoncrawlArchiveLinesCounter' in spark.pom)
// ./bin/spark-submit --master spark://LV-WS10.lviv:7077 spark-cortex-fat.jar spark://LV-WS10.lviv:7077 hdfs://LV-WS10.lviv:9000/commoncrawl 9
package com.qbeats.cortex
import org.apache.spark.{SparkContext, SparkConf}
object CommoncrawlArchiveLinesCounter extends App {
override def main(args: Array[String]): Unit = {
if (args.length >= 3) {
val appName = "CommoncrawlArchiveLinesCounter"
val conf = new SparkConf().setAppName(appName).setMaster(args(0))
conf.set("spark.executor.memory", "6g")
conf.set("spark.shuffle.service.enabled", "true")
conf.set("spark.dynamicAllocation.enabled", "true")
conf.set("spark.dynamicAllocation.initialExecutors", args(2))
val sc = new SparkContext(conf)
val helper = new Helper
val nLines = sc.
textFile(args(1) + "/*").
mapPartitionsWithIndex( (index, it) => {
println("Processing partition %s".format(index))
it
}).
count
println(nLines)
}
}
}
SAIF C,能详细解释一下吗?
我已经通过从 Spark 的独立模式切换到 YARN 来解决问题。
相关主题:
我有一个由 9 台计算机组成的集群,上面安装了 Apache Hadoop 2.7.2 和 Spark 2.0.0。每台计算机 运行 都有一个 HDFS 数据节点和 Spark 从站。其中一台计算机还 运行 一个 HDFS namenode 和 Spark master。
我已经在复制=2 的 HDFS 中上传了几 TB 的 gz 存档。事实证明,某些档案已损坏。我想找到他们。看起来 'gunzip -t ' 可以提供帮助。所以我试图找到一种方法来 运行 集群上的 Spark 应用程序,以便每个 Spark 执行程序测试存档 'local' (即,其中一个副本位于该执行程序 运行s) 尽可能长的时间。以下脚本 运行s 但有时 Spark 执行程序会处理 HDFS 中的 'remote' 个文件:
// Usage (after packaging a jar with mainClass set to 'com.qbeats.cortex.CommoncrawlArchivesTester' in spark.pom
// and placing this jar file into Spark's home directory):
// ./bin/spark-submit --master spark://LV-WS10.lviv:7077 spark-cortex-fat.jar spark://LV-WS10.lviv:7077 hdfs://LV-WS10.lviv:9000/commoncrawl 9
// means testing for corruption the gz-archives in the directory hdfs://LV-WS10.lviv:9000/commoncrawl
// using a Spark cluster with the Spark master URL spark://LV-WS10.lviv:7077 and 9 Spark slaves
package com.qbeats.cortex
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.FileSplit
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.{SparkContext, SparkConf, AccumulatorParam}
import sys.process._
object CommoncrawlArchivesTester extends App {
object LogAccumulator extends AccumulatorParam[String] {
def zero(initialValue: String): String = ""
def addInPlace(log1: String, log2: String) = if (log1.isEmpty) log2 else log1 + "\n" + log2
}
override def main(args: Array[String]): Unit = {
if (args.length >= 3) {
val appName = "CommoncrawlArchivesTester"
val conf = new SparkConf().setAppName(appName).setMaster(args(0))
conf.set("spark.executor.memory", "6g")
conf.set("spark.shuffle.service.enabled", "true")
conf.set("spark.dynamicAllocation.enabled", "true")
conf.set("spark.dynamicAllocation.initialExecutors", args(2))
val sc = new SparkContext(conf)
val log = sc.accumulator(LogAccumulator.zero(""))(LogAccumulator)
val text = sc.hadoopFile(args(1), classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]]
val fileAndLine = hadoopRdd.mapPartitionsWithInputSplit { (inputSplit, iterator) =>
val fileName = inputSplit.asInstanceOf[FileSplit].getPath.toString
class FilePath extends Iterable[String] {
def iterator = List(fileName).iterator
}
val result = (sys.env("HADOOP_PREFIX") + "/bin/hadoop fs -cat " + fileName) #| "gunzip -t" !
println("Processed %s.".format(fileName))
if (result != 0) {
log.add(fileName)
println("Corrupt: %s.".format(fileName))
}
(new FilePath).iterator
}
val result = fileAndLine.collect()
println("Corrupted files:")
println(log.value)
}
}
}
你有什么建议?
稍后添加:
我尝试了另一个脚本,它通过 textFile() 从 HDFS 获取文件。我看起来像 Spark 执行程序在输入文件中不喜欢 'local' 的文件。和"Spark brings code to data, not data to code"不矛盾吗?
// Usage (after packaging a jar with mainClass set to 'com.qbeats.cortex.CommoncrawlArchiveLinesCounter' in spark.pom)
// ./bin/spark-submit --master spark://LV-WS10.lviv:7077 spark-cortex-fat.jar spark://LV-WS10.lviv:7077 hdfs://LV-WS10.lviv:9000/commoncrawl 9
package com.qbeats.cortex
import org.apache.spark.{SparkContext, SparkConf}
object CommoncrawlArchiveLinesCounter extends App {
override def main(args: Array[String]): Unit = {
if (args.length >= 3) {
val appName = "CommoncrawlArchiveLinesCounter"
val conf = new SparkConf().setAppName(appName).setMaster(args(0))
conf.set("spark.executor.memory", "6g")
conf.set("spark.shuffle.service.enabled", "true")
conf.set("spark.dynamicAllocation.enabled", "true")
conf.set("spark.dynamicAllocation.initialExecutors", args(2))
val sc = new SparkContext(conf)
val helper = new Helper
val nLines = sc.
textFile(args(1) + "/*").
mapPartitionsWithIndex( (index, it) => {
println("Processing partition %s".format(index))
it
}).
count
println(nLines)
}
}
}
SAIF C,能详细解释一下吗?
我已经通过从 Spark 的独立模式切换到 YARN 来解决问题。
相关主题: