Spark RDD 抛出 NullPointerException
Spark RDD throwing NullPointerException
当我尝试从 spark 中的 hive table 和 process/apply rools 获取一些产品时,我遇到了问题。
//function which return products from Hive table
def getProductsList(hiveContext: org.apache.spark.sql.hive.HiveContext): scala.collection.mutable.MutableList[Product] = {
val products = scala.collection.mutable.MutableList[Product]()
val results = hiveContext.sql("select item_id,value from details where type_id=12");
val collection = results.collect();
var i = 0;
results.collect.foreach(t => {
val product = new Product(collection(i)(0).asInstanceOf[Long], collection(i)(1).asInstanceOf[String]);
i = i+ 1;
products += product
})
products
}
调用 getProductsList 函数并对产品应用 drools roools。
val randomProducts = this.getProductsList(hiveContext)
val rdd = ssc.sparkContext.parallelize(randomProducts)
val evaluatedProducts = rdd.mapPartitions(incomingProducts => {
print("Hello");
rulesExecutor.evalRules(incomingProducts) })
val productdf = hiveContext.applySchema(evaluatedProducts, classOf[Product])
})
如上所示,rdd mapPartitions 迭代未发生,并且抛出以下错误。但我确定 rdd 不为空。
Exception in thread "main" java.lang.NullPointerException
at org.spark-project.guava.reflect.TypeToken.method(TypeToken.java:465)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun.apply(JavaTypeInference.scala:103)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun.apply(JavaTypeInference.scala:102)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:47)
at org.apache.spark.sql.SQLContext.getSchema(SQLContext.scala:995)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:488)
at org.apache.spark.sql.SQLContext.applySchema(SQLContext.scala:1028)
at com.cloudera.sprue.ValidateEan$.main(ValidateEan.scala:70)
at com.cloudera.sprue.ValidateEan.main(ValidateEan.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
16/05/05 07:44:48 INFO SparkContext: Invoking stop() from shutdown hook
请帮我解决这个问题。
因为我们需要最终结果 DataFrame
,所以我们使用从 hiveContext.sql()
.
返回的 SchemaRDD
//defining schema
case class Product(id: Long, value: String)
//loading data from Hive table
val results: DataSet[Row] = hiveContext.sql("select item_id,value from details where type_id=12")
//convert ROW type to Product type then pass it to rulesExecutor.evalRules()
val evaluatedProducts = results.map(productRow => rulesExecutor.evalRules(Product(productRow.getLong(0), productRow.getString(1)))).toDF()
我假设 rulesExecutor.evalRules()
会接受 Product
类型。如果不是,我们可以使用 Row
类型(无需在 map()
中显式转换)。
当我尝试从 spark 中的 hive table 和 process/apply rools 获取一些产品时,我遇到了问题。
//function which return products from Hive table
def getProductsList(hiveContext: org.apache.spark.sql.hive.HiveContext): scala.collection.mutable.MutableList[Product] = {
val products = scala.collection.mutable.MutableList[Product]()
val results = hiveContext.sql("select item_id,value from details where type_id=12");
val collection = results.collect();
var i = 0;
results.collect.foreach(t => {
val product = new Product(collection(i)(0).asInstanceOf[Long], collection(i)(1).asInstanceOf[String]);
i = i+ 1;
products += product
})
products
}
调用 getProductsList 函数并对产品应用 drools roools。
val randomProducts = this.getProductsList(hiveContext)
val rdd = ssc.sparkContext.parallelize(randomProducts)
val evaluatedProducts = rdd.mapPartitions(incomingProducts => {
print("Hello");
rulesExecutor.evalRules(incomingProducts) })
val productdf = hiveContext.applySchema(evaluatedProducts, classOf[Product])
})
如上所示,rdd mapPartitions 迭代未发生,并且抛出以下错误。但我确定 rdd 不为空。
Exception in thread "main" java.lang.NullPointerException
at org.spark-project.guava.reflect.TypeToken.method(TypeToken.java:465)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun.apply(JavaTypeInference.scala:103)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun.apply(JavaTypeInference.scala:102)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:47)
at org.apache.spark.sql.SQLContext.getSchema(SQLContext.scala:995)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:488)
at org.apache.spark.sql.SQLContext.applySchema(SQLContext.scala:1028)
at com.cloudera.sprue.ValidateEan$.main(ValidateEan.scala:70)
at com.cloudera.sprue.ValidateEan.main(ValidateEan.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
16/05/05 07:44:48 INFO SparkContext: Invoking stop() from shutdown hook
请帮我解决这个问题。
因为我们需要最终结果 DataFrame
,所以我们使用从 hiveContext.sql()
.
//defining schema
case class Product(id: Long, value: String)
//loading data from Hive table
val results: DataSet[Row] = hiveContext.sql("select item_id,value from details where type_id=12")
//convert ROW type to Product type then pass it to rulesExecutor.evalRules()
val evaluatedProducts = results.map(productRow => rulesExecutor.evalRules(Product(productRow.getLong(0), productRow.getString(1)))).toDF()
我假设 rulesExecutor.evalRules()
会接受 Product
类型。如果不是,我们可以使用 Row
类型(无需在 map()
中显式转换)。