Spark & Scala - RDD遍历中的NullPointerException
Spark & Scala - NullPointerException in RDD traversal
我有一些 CSV 文件,需要通过文件名的一部分将它们组合成一个 RDD。
例如,对于以下文件
$ ls
20140101_1.csv 20140101_3.csv 20140201_2.csv 20140301_1.csv
20140301_3.csv 20140101_2.csv 20140201_1.csv 20140201_3.csv
我需要将名称为 20140101*.csv
的文件合并到一个 RDD 中以进行处理等等。
我正在使用 sc.wholeTextFiles
读取整个目录,然后将文件名按模式分组以形成一串文件名。
然后我将字符串传递给 sc.textFile 以将文件作为单个 RDD 打开。
这是我的代码-
val files = sc.wholeTextFiles("*.csv")
val indexed_files = files.map(a => (a._1.split("_")(0),a._1))
val data = indexed_files.groupByKey
data.map { a =>
var name = a._2.mkString(",")
(a._1, name)
}
data.foreach { a =>
var file = sc.textFile(a._2)
println(file.count)
}
当我尝试调用 textFile
时得到 SparkException - NullPointerException
。错误栈指的是RDD内部的一个Iterator。我无法理解错误 -
15/07/21 15:37:37 INFO TaskSchedulerImpl: Removed TaskSet 65.0, whose tasks have all completed, from pool
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 65.0 failed 4 times, most recent failure: Lost task 1.3 in stage 65.0 (TID 115, 10.132.8.10): java.lang.NullPointerException
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:33)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:32)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:870)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:870)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1765)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1765)
但是,当我在 spark shell 中执行 sc.textFile(data.first._2).count
时,我能够形成 RDD 并能够检索计数。
非常感谢任何帮助。
将评论转换为答案:
var file = sc.textFile(a._2)
在另一个 RDD 的 foreach
中是行不通的。你不能像那样嵌套 RDD。
我有一些 CSV 文件,需要通过文件名的一部分将它们组合成一个 RDD。
例如,对于以下文件
$ ls
20140101_1.csv 20140101_3.csv 20140201_2.csv 20140301_1.csv
20140301_3.csv 20140101_2.csv 20140201_1.csv 20140201_3.csv
我需要将名称为 20140101*.csv
的文件合并到一个 RDD 中以进行处理等等。
我正在使用 sc.wholeTextFiles
读取整个目录,然后将文件名按模式分组以形成一串文件名。
然后我将字符串传递给 sc.textFile 以将文件作为单个 RDD 打开。
这是我的代码-
val files = sc.wholeTextFiles("*.csv")
val indexed_files = files.map(a => (a._1.split("_")(0),a._1))
val data = indexed_files.groupByKey
data.map { a =>
var name = a._2.mkString(",")
(a._1, name)
}
data.foreach { a =>
var file = sc.textFile(a._2)
println(file.count)
}
当我尝试调用 textFile
时得到 SparkException - NullPointerException
。错误栈指的是RDD内部的一个Iterator。我无法理解错误 -
15/07/21 15:37:37 INFO TaskSchedulerImpl: Removed TaskSet 65.0, whose tasks have all completed, from pool
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 65.0 failed 4 times, most recent failure: Lost task 1.3 in stage 65.0 (TID 115, 10.132.8.10): java.lang.NullPointerException
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:33)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:32)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:870)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:870)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1765)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1765)
但是,当我在 spark shell 中执行 sc.textFile(data.first._2).count
时,我能够形成 RDD 并能够检索计数。
非常感谢任何帮助。
将评论转换为答案:
var file = sc.textFile(a._2)
在另一个 RDD 的 foreach
中是行不通的。你不能像那样嵌套 RDD。