scala.MatchError: in Dataframes

scala.MatchError: in Dataframes

我有一份 Spark (version 1.3.1) 申请。其中,我试图将一个 Java bean RDD JavaRDD<Message> 转换为 Dataframe,它有许多具有不同数据类型(整数、字符串、列表、映射、双精度)的字段。

但是,当我执行我的代码时。

messages.foreachRDD(new Function2<JavaRDD<Message>,Time,Void>(){
            @Override
            public Void call(JavaRDD<Message> arg0, Time arg1) throws Exception {
                SQLContext sqlContext = SparkConnection.getSqlContext();
                DataFrame df = sqlContext.createDataFrame(arg0, Message.class);
                df.registerTempTable("messages");

我遇到了这个错误

/06/12 17:27:40 INFO JobScheduler: Starting job streaming job 1434110260000 ms.0 from job set of time 1434110260000 ms
15/06/12 17:27:40 ERROR JobScheduler: Error running job streaming job 1434110260000 ms.1
scala.MatchError: interface java.util.List (of class java.lang.Class)
    at org.apache.spark.sql.SQLContext$$anonfun$getSchema.apply(SQLContext.scala:1193)
    at org.apache.spark.sql.SQLContext$$anonfun$getSchema.apply(SQLContext.scala:1192)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.sql.SQLContext.getSchema(SQLContext.scala:1192)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:437)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:465)

如果 Message 有许多不同的字段,例如 List,并且错误消息指向 List 匹配错误,那么问题就出在这里。此外,如果您查看 the source code,您会发现 List 不在匹配项中。

但是除了在源代码中挖掘之外,这在文档 here under the Java tab:

中也非常清楚地说明

Currently, Spark SQL does not support JavaBeans that contain nested or contain complex types such as Lists or Arrays.

您可能想切换到 Scala,因为那里似乎支持它:

Case classes can also be nested or contain complex types such as Sequences or Arrays. This RDD can be implicitly converted to a DataFrame and then be registered as a table.

所以解决方案是使用 Scala 或从您的 JavaBean 中删除 List

作为最后的手段,您可以查看 SQLUserDefinedType 来定义 List 应该如何持久化,也许可以将其组合在一起。

我通过将我的 Spark 版本从 1.3.1 更新到 1.4.0 解决了这个问题。现在,它工作文件。