Spark 机器学习:RDD 变得不可读
Spark Machine Learning: RDD becomes unreadable
我正在尝试将 Vector 数据类型提供给 Spark 中名为 Word2Vec
的 mllib
函数。由于 Word2Vec
returns DataFrame
的 "result" 列包含所需的 Vector,因此需要一些代码。现在终于当代码 运行s 在 Spark 中成功时,我尝试使用 .foreach
到 println
几行代码。 Spark 在此步骤崩溃并出现以下错误:NullPointerException
。如果我删除 println
命令,代码将 运行 正常。我尝试使用 RDD 的示例方法,但出现了相同的 Spark 错误。 RDD 不知何故变得不可读。
要了解这个ML任务的背景,请参考这个link。
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.feature.{LabeledPoint => NewLabeledPoint}
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import scala.util.{Success, Try}
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
val input_labelled = labelledTweets.map(
t => (t._1, word2VecModel2.transform(t._2.toDF("text")).select("result").first().getAs[org.apache.spark.ml.linalg.Vector](0)))
.map(x => new LabeledPoint((x._1).toDouble, x._2))
input_labelled.take(3).foreach(println)
documentDF2: org.apache.spark.sql.DataFrame = [value: array<string>]
word2Vec2: org.apache.spark.ml.feature.Word2Vec = w2v_643337d9029a
word2VecModel2: org.apache.spark.ml.feature.Word2VecModel = w2v_643337d9029a
input_labelled: org.apache.spark.rdd.RDD[org.apache.spark.ml.feature.LabeledPoint] = MapPartitionsRDD[52] at map at <console>:74
input1: org.apache.spark.sql.DataFrame = [value: array<string>]
model2: org.apache.spark.ml.linalg.Vector = [9.573533798832813E-5,-1.8443804499634973E-4,3.803069862805999E-5,-4.663512611061804E-5,1.3393058071633097E-4]
[9.573533798832813E-5,-1.8443804499634973E-4,3.803069862805999E-5,-4.663512611061804E-5,1.3393058071633097E-4]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 4 times, most recent failure: Lost task 0.3 in stage 21.0 (TID 159, sandbox-hdp.hortonworks.com, executor 1): java.lang.NullPointerException
at $anonfun.apply(<console>:73)
at $anonfun.apply(<console>:73)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2069)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2069)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:916)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
... 58 elided
Caused by: java.lang.NullPointerException
at $anonfun.apply(<console>:73)
at $anonfun.apply(<console>:73)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
很可能您的输入之一包含空字段。 Spark 惰性求值,所以在 运行 take(3)
之前,您实际上还没有进行任何计算,这就是为什么没有该行就没有错误的原因。
此外,将 RDD
转换为 DataFrame
然后应用转换器会更典型(并且可能更快)。
我正在尝试将 Vector 数据类型提供给 Spark 中名为 Word2Vec
的 mllib
函数。由于 Word2Vec
returns DataFrame
的 "result" 列包含所需的 Vector,因此需要一些代码。现在终于当代码 运行s 在 Spark 中成功时,我尝试使用 .foreach
到 println
几行代码。 Spark 在此步骤崩溃并出现以下错误:NullPointerException
。如果我删除 println
命令,代码将 运行 正常。我尝试使用 RDD 的示例方法,但出现了相同的 Spark 错误。 RDD 不知何故变得不可读。
要了解这个ML任务的背景,请参考这个link。
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.feature.{LabeledPoint => NewLabeledPoint}
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import scala.util.{Success, Try}
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
val input_labelled = labelledTweets.map(
t => (t._1, word2VecModel2.transform(t._2.toDF("text")).select("result").first().getAs[org.apache.spark.ml.linalg.Vector](0)))
.map(x => new LabeledPoint((x._1).toDouble, x._2))
input_labelled.take(3).foreach(println)
documentDF2: org.apache.spark.sql.DataFrame = [value: array<string>]
word2Vec2: org.apache.spark.ml.feature.Word2Vec = w2v_643337d9029a
word2VecModel2: org.apache.spark.ml.feature.Word2VecModel = w2v_643337d9029a
input_labelled: org.apache.spark.rdd.RDD[org.apache.spark.ml.feature.LabeledPoint] = MapPartitionsRDD[52] at map at <console>:74
input1: org.apache.spark.sql.DataFrame = [value: array<string>]
model2: org.apache.spark.ml.linalg.Vector = [9.573533798832813E-5,-1.8443804499634973E-4,3.803069862805999E-5,-4.663512611061804E-5,1.3393058071633097E-4]
[9.573533798832813E-5,-1.8443804499634973E-4,3.803069862805999E-5,-4.663512611061804E-5,1.3393058071633097E-4]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 4 times, most recent failure: Lost task 0.3 in stage 21.0 (TID 159, sandbox-hdp.hortonworks.com, executor 1): java.lang.NullPointerException
at $anonfun.apply(<console>:73)
at $anonfun.apply(<console>:73)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2069)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2069)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:916)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
... 58 elided
Caused by: java.lang.NullPointerException
at $anonfun.apply(<console>:73)
at $anonfun.apply(<console>:73)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
很可能您的输入之一包含空字段。 Spark 惰性求值,所以在 运行 take(3)
之前,您实际上还没有进行任何计算,这就是为什么没有该行就没有错误的原因。
此外,将 RDD
转换为 DataFrame
然后应用转换器会更典型(并且可能更快)。