数据框过滤器给出 NullPointerException
dataframe filter gives NullPointerException
在 Spark 1.6.0 中,我有一个数据框,其中有一列包含职位描述,例如:
Description
bartender
bartender
employee
taxi-driver
...
我从该列中检索了一个唯一值列表:
val jobs = people.select("Description").distinct().rdd.map(r => r(0).asInstanceOf[String]).repartition(4)
然后我尝试针对每个职位描述检索担任该职位的人员并做某事,但我得到了 NullPointerException:
jobs.foreach {
ajob =>
var peoplewithjob = people.filter($"Description" === ajob)
// ... do stuff
}
我不明白为什么会这样,因为每个工作都是从人员数据框中提取的,所以应该至少有一个与该工作有关...欢迎提供更多提示!这是堆栈跟踪:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 4.0 failed 1 times, most recent failure: Lost task 3.0 in stage 4.0 (TID 206, localhost): java.lang.NullPointerException
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:799)
at jago.Run$$anonfun$main.apply(Run.scala:89)
at jago.Run$$anonfun$main.apply(Run.scala:82)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:912)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:912)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
发生这种情况是因为 Spark 不支持嵌套操作或转换。如果你想对从 DataFrame
中提取的不同值进行操作,你必须将结果提取到驱动程序并在本地迭代:
// or toLocalIterator
jobs.collect.foreach {
ajob =>
var peoplewithjob = people.filter($"Description" === ajob)
}
取决于您应用何种类型的转换作为“做事”,简单地grouBy
和聚合可能是一个更好的主意:
people.groupBy($"Description").agg(...)
在 Spark 1.6.0 中,我有一个数据框,其中有一列包含职位描述,例如:
Description
bartender
bartender
employee
taxi-driver
...
我从该列中检索了一个唯一值列表:
val jobs = people.select("Description").distinct().rdd.map(r => r(0).asInstanceOf[String]).repartition(4)
然后我尝试针对每个职位描述检索担任该职位的人员并做某事,但我得到了 NullPointerException:
jobs.foreach {
ajob =>
var peoplewithjob = people.filter($"Description" === ajob)
// ... do stuff
}
我不明白为什么会这样,因为每个工作都是从人员数据框中提取的,所以应该至少有一个与该工作有关...欢迎提供更多提示!这是堆栈跟踪:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 4.0 failed 1 times, most recent failure: Lost task 3.0 in stage 4.0 (TID 206, localhost): java.lang.NullPointerException
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:799)
at jago.Run$$anonfun$main.apply(Run.scala:89)
at jago.Run$$anonfun$main.apply(Run.scala:82)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:912)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:912)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
发生这种情况是因为 Spark 不支持嵌套操作或转换。如果你想对从 DataFrame
中提取的不同值进行操作,你必须将结果提取到驱动程序并在本地迭代:
// or toLocalIterator
jobs.collect.foreach {
ajob =>
var peoplewithjob = people.filter($"Description" === ajob)
}
取决于您应用何种类型的转换作为“做事”,简单地grouBy
和聚合可能是一个更好的主意:
people.groupBy($"Description").agg(...)