将 DataFrame 中的 ArrayBuffer 转换为 HashSet 从 Hive 转换为 RDD 时出现 GenericRowWithSchema 异常 table
GenericRowWithSchema exception in casting ArrayBuffer to HashSet in DataFrame to RDD from Hive table
我有一个使用
生成的镶木地板格式的 Hive table
create table myTable (var1 int, var2 string, var3 int, var4 string, var5 array<struct<a:int,b:string>>) stored as parquet;
我可以验证它是否已填充 -- 这是示例值
[1, "abcdef", 2, "ghijkl", ArrayBuffer([1, "hello"])]
我希望将其放入形式为
的 Spark RDD
((1,"abcdef"), ((2,"ghijkl"), Set((1,"hello"))))
现在,使用 spark-shell(我在 spark-submit 中遇到了同样的问题),我用这些值做了一个测试 RDD
scala> val tempRDD = sc.parallelize(Seq(((1,"abcdef"),((2,"ghijkl"), ArrayBuffer[(Int,String)]((1,"hello"))))))
tempRDD: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[44] at parallelize at <console>:85
使用迭代器,我可以将 ArrayBuffer 转换为以下新 RDD 中的 HashSet:
scala> val tempRDD2 = tempRDD.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
tempRDD2: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[46] at map at <console>:87
scala> tempRDD2.collect.foreach(println)
((1,abcdef),((2,ghijkl),Set((1,hello))))
但是,当我尝试对具有 HiveContext/SQLContext 的 DataFrame 执行完全相同的操作时,出现以下错误:
scala> val hc = new HiveContext(sc)
scala> import hc._
scala> import hc.implicits._
scala> val tempHiveQL = hc.sql("""select var1, var2, var3, var4, var5 from myTable""")
scala> val tempRDDfromHive = tempHiveQL.map(a => ((a(0).toString.toInt, a(1).toString), ((a(2).toString.toInt, a(3).toString), a(4).asInstanceOf[ArrayBuffer[(Int,String)]] )))
scala> val tempRDD3 = tempRDDfromHive.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
tempRDD3: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[47] at map at <console>:91
scala> tempRDD3.collect.foreach(println)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage 14.0 (TID 5211, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$$anonfun$apply.apply(<console>:91)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:91)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:91)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:813)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:813)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1503)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1503)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1191)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
请注意,当我在使用 spark-submit 的编译程序中 运行 时,我得到了同样的错误“GenericRowWithSchema cannot be cast to scala.Tuple2”。程序在遇到转换步骤时在 运行 TIME 崩溃,我没有编译错误。
我觉得很奇怪,我人工生成的 RDD“tempRDD”可以进行转换,而 Hive 查询 DataFrame->RDD 却不行。我检查了一下,两个 RDD 的形式相同:
scala> tempRDD
org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = MapPartitionsRDD[21] at map at DataFrame.scala:776
scala> tempRDDfromHive
org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[25] at parallelize at <console>:70
唯一的区别是他们最后一步的起点。在 运行 执行 tempRDD2 和 tempRDD3 的步骤之前,我什至尝试了持久化、检查点和具体化这些 RDD。所有人都收到相同的错误消息。
我还阅读了相关的 Whosebug 问题和 Apache Spark Jira 问题,并且从那些我尝试将 ArrayBuffer 转换为迭代器的问题中,但在第二步中也失败了,出现了同样的错误。
有谁知道如何正确地将 ArrayBuffers 转换为来自 Hive tables 的 DataFrames 的 HashSets?由于错误似乎只针对 Hive table 版本,我很想认为这是 SparkSQL 中 Spark/Hive 集成的问题。
有什么想法吗?
我的Spark版本是1.3.0 CDH。
这里是 printSchema 结果:
scala> tempRDDfromHive.printSchema()
root
|-- var1: integer (nullable = true)
|-- var2: string (nullable = true)
|-- var3: integer (nullable = true)
|-- var4: string (nullable = true)
|-- var5: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: integer (nullable = true)
| | |-- b: string (nullable = true)
你在 map
阶段实际得到的不是 ArrayBuffer[(Int, String)]
而是 ArrayBuffer[Row]
因此错误。忽略其他列你需要的是这样的:
import org.apache.spark.sql.Row
tempHiveQL.map((a: Row) =>
a.getAs[Seq[Row]](4).map{case Row(k: Int, v: String) => (k, v)}.toSet)
看来这个问题已经在 Spark 1.5.0 中修复了。
我有一个使用
生成的镶木地板格式的 Hive tablecreate table myTable (var1 int, var2 string, var3 int, var4 string, var5 array<struct<a:int,b:string>>) stored as parquet;
我可以验证它是否已填充 -- 这是示例值
[1, "abcdef", 2, "ghijkl", ArrayBuffer([1, "hello"])]
我希望将其放入形式为
的 Spark RDD((1,"abcdef"), ((2,"ghijkl"), Set((1,"hello"))))
现在,使用 spark-shell(我在 spark-submit 中遇到了同样的问题),我用这些值做了一个测试 RDD
scala> val tempRDD = sc.parallelize(Seq(((1,"abcdef"),((2,"ghijkl"), ArrayBuffer[(Int,String)]((1,"hello"))))))
tempRDD: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[44] at parallelize at <console>:85
使用迭代器,我可以将 ArrayBuffer 转换为以下新 RDD 中的 HashSet:
scala> val tempRDD2 = tempRDD.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
tempRDD2: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[46] at map at <console>:87
scala> tempRDD2.collect.foreach(println)
((1,abcdef),((2,ghijkl),Set((1,hello))))
但是,当我尝试对具有 HiveContext/SQLContext 的 DataFrame 执行完全相同的操作时,出现以下错误:
scala> val hc = new HiveContext(sc)
scala> import hc._
scala> import hc.implicits._
scala> val tempHiveQL = hc.sql("""select var1, var2, var3, var4, var5 from myTable""")
scala> val tempRDDfromHive = tempHiveQL.map(a => ((a(0).toString.toInt, a(1).toString), ((a(2).toString.toInt, a(3).toString), a(4).asInstanceOf[ArrayBuffer[(Int,String)]] )))
scala> val tempRDD3 = tempRDDfromHive.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
tempRDD3: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[47] at map at <console>:91
scala> tempRDD3.collect.foreach(println)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage 14.0 (TID 5211, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$$anonfun$apply.apply(<console>:91)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:91)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:91)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:813)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:813)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1503)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1503)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1191)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
请注意,当我在使用 spark-submit 的编译程序中 运行 时,我得到了同样的错误“GenericRowWithSchema cannot be cast to scala.Tuple2”。程序在遇到转换步骤时在 运行 TIME 崩溃,我没有编译错误。
我觉得很奇怪,我人工生成的 RDD“tempRDD”可以进行转换,而 Hive 查询 DataFrame->RDD 却不行。我检查了一下,两个 RDD 的形式相同:
scala> tempRDD
org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = MapPartitionsRDD[21] at map at DataFrame.scala:776
scala> tempRDDfromHive
org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[25] at parallelize at <console>:70
唯一的区别是他们最后一步的起点。在 运行 执行 tempRDD2 和 tempRDD3 的步骤之前,我什至尝试了持久化、检查点和具体化这些 RDD。所有人都收到相同的错误消息。
我还阅读了相关的 Whosebug 问题和 Apache Spark Jira 问题,并且从那些我尝试将 ArrayBuffer 转换为迭代器的问题中,但在第二步中也失败了,出现了同样的错误。
有谁知道如何正确地将 ArrayBuffers 转换为来自 Hive tables 的 DataFrames 的 HashSets?由于错误似乎只针对 Hive table 版本,我很想认为这是 SparkSQL 中 Spark/Hive 集成的问题。
有什么想法吗?
我的Spark版本是1.3.0 CDH。
这里是 printSchema 结果:
scala> tempRDDfromHive.printSchema()
root
|-- var1: integer (nullable = true)
|-- var2: string (nullable = true)
|-- var3: integer (nullable = true)
|-- var4: string (nullable = true)
|-- var5: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: integer (nullable = true)
| | |-- b: string (nullable = true)
你在 map
阶段实际得到的不是 ArrayBuffer[(Int, String)]
而是 ArrayBuffer[Row]
因此错误。忽略其他列你需要的是这样的:
import org.apache.spark.sql.Row
tempHiveQL.map((a: Row) =>
a.getAs[Seq[Row]](4).map{case Row(k: Int, v: String) => (k, v)}.toSet)
看来这个问题已经在 Spark 1.5.0 中修复了。