Spark SQL 'explode' 命令在 AWS EC2 上失败但在本地成功

Spark SQL 'explode' command failing on AWS EC2 but succeeding locally

我正在使用 Spark SQL(我提到它在 Spark 中以防影响 SQL 语法 - 我还不够熟悉,无法确定)并且我有一个 table 我正在尝试重新构建。我有一种在本地工作的方法,但是当我尝试在 AWS EC2 实例上 运行 相同的命令时,我收到一条错误报告,说我有一个 'unresolved operator'

基本上我的数据如下:

userId    someString      varA   
   1      "example1"     [0,2,5] 
   2      "example2"     [1,20,5] 

我在 varA 的 sqlContext 中使用 'explode' 命令。当我 运行 这在本地 return 正确时,但在 AWS 上它们失败了。

我可以使用以下命令重现此内容:

val data = List(
  ("1", "example1", Array(0,2,5)), ("2", "example2", Array(1,20,5)))
val distData = sc.parallelize(data)
val distTable = distData.toDF("userId", "someString", "varA")
distTable.registerTempTable("distTable_tmp")
val temp1 = sqlContext.sql("select userId, someString, varA from distTable_tmp")
val temp2 = sqlContext.sql(
  "select userId, someString, explode(varA) as varA from distTable_tmp")

在本地,temp1.show() 和 temp2.show() return 我所期望的,即:

scala> temp1.show()
+------+----------+----------+
|userId|someString|      varA|
+------+----------+----------+
|     1|  example1| [0, 2, 5]|
|     2|  example2|[1, 20, 5]|
+------+----------+----------+

scala> temp2.show()
+------+----------+----+
|userId|someString|varA|
+------+----------+----+
|     1|  example1|   0|
|     1|  example1|   2|
|     1|  example1|   5|
|     2|  example2|   1|
|     2|  example2|  20|
|     2|  example2|   5|
+------+----------+----+

但在 AWS 上,temp1 sqlContext 命令工作正常,但 temp2 失败并显示消息:

scala> val temp2 = sqlContext.sql("select userId, someString, explode(varA) as varA from distTable_tmp")
15/11/05 22:46:49 INFO parse.ParseDriver: Parsing command: select userId, someString, explode(varA) as varA from distTable_tmp
15/11/05 22:46:49 INFO parse.ParseDriver: Parse Completed
org.apache.spark.sql.AnalysisException: unresolved operator 'Project [userId#3,someString#4,HiveGenericUdtf#org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode(varA#5) AS varA#6];
...

非常感谢。

问题的根源是您在 EC2 上使用的 Spark 版本。 explode 功能已在 Spark 1.4 中引入,因此无法在 1.3.1 上使用。可以像这样使用 RDDflatMap

import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}

val rows: RDD[Row] = distTable.rdd.flatMap(
  row => row.getAs[Seq[Int]](2).map(v => Row.fromSeq(row.toSeq :+ v)))
val newSchema = StructType(
  distTable.schema.fields :+ StructField("varA_exploded", IntegerType, true))

sqlContext.createDataFrame(rows, newSchema).show

// userId someString varA                 varA_exploded
// 1      example1   ArrayBuffer(0, 2, 5) 0            
// 1      example1   ArrayBuffer(0, 2, 5) 2            
// 1      example1   ArrayBuffer(0, 2, 5) 5            
// 2      example2   ArrayBuffer(1, 20... 1            
// 2      example2   ArrayBuffer(1, 20... 20           
// 2      example2   ArrayBuffer(1, 20... 5      

但它怀疑是否值得大惊小怪。