Spark & Scala - RDD遍历中的NullPointerException

Spark & Scala - NullPointerException in RDD traversal

我有一些 CSV 文件,需要通过文件名的一部分将它们组合成一个 RDD。

例如,对于以下文件

$ ls   
20140101_1.csv  20140101_3.csv  20140201_2.csv  20140301_1.csv 
20140301_3.csv 20140101_2.csv  20140201_1.csv  20140201_3.csv 

我需要将名称为 20140101*.csv 的文件合并到一个 RDD 中以进行处理等等。

我正在使用 sc.wholeTextFiles 读取整个目录,然后将文件名按模式分组以形成一串文件名。 然后我将字符串传递给 sc.textFile 以将文件作为单个 RDD 打开。

这是我的代码-

val files = sc.wholeTextFiles("*.csv")
val indexed_files = files.map(a => (a._1.split("_")(0),a._1))
val data = indexed_files.groupByKey

data.map { a =>
  var name = a._2.mkString(",")
  (a._1, name)
}

data.foreach { a =>
  var file = sc.textFile(a._2)
  println(file.count)
}

当我尝试调用 textFile 时得到 SparkException - NullPointerException。错误栈指的是RDD内部的一个Iterator。我无法理解错误 -

15/07/21 15:37:37 INFO TaskSchedulerImpl: Removed TaskSet 65.0, whose tasks have all completed, from pool
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 65.0 failed 4 times, most recent failure: Lost task 1.3 in stage 65.0 (TID 115, 10.132.8.10): java.lang.NullPointerException
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:33)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:32)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:870)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:870)
        at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1765)
        at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1765)

但是,当我在 spark shell 中执行 sc.textFile(data.first._2).count 时,我能够形成 RDD 并能够检索计数。

非常感谢任何帮助。

将评论转换为答案:

var file = sc.textFile(a._2)

在另一个 RDD 的 foreach 中是行不通的。你不能像那样嵌套 RDD。