Apache Spark SQL 标识符预期异常

Apache Spark SQL identifier expected exception

我的问题与这个问题非常相似:Apache Spark SQL issue : java.lang.RuntimeException: [1.517] failure: identifier expected 但我就是想不通我的问题出在哪里。我使用 SQLite 作为数据库后端。连接和简单的 select 语句工作正常。

违规行:

val df = tableData.selectExpr(tablesMap(t).toSeq:_*).map(r => myMapFunc(r))

tablesMap 包含作为键的 table 名称和作为表达式的字符串数组。打印后,数组如下所示:

WrappedArray([My Col A], [ColB] || [Col C] AS ColB)

table 名称也包含在方括号中,因为它包含空格。我得到的异常:

Exception in thread "main" java.lang.RuntimeException: [1.1] failure: identifier expected

我已经确保不使用任何 Spark Sql 关键字。在我看来,此代码失败的可能原因有 2 个:1) 我以某种方式错误地处理了列名中的空格。 2) 我处理连接错误。

我正在使用类似 CSV 的资源文件,其中包含我希望在 table 上计算的表达式。除了这个文件,我还想允许用户在运行时指定额外的 tables 和它们各自的列表达式。该文件如下所示:

TableName,`Col A`,`ColB`,CONCAT(`ColB`, ' ', `Col C`)

显然这不起作用。不过我想重用这个文件,当然要修改。我的想法是将带有字符串数组表达式的列映射到一系列火花列,就像现在一样。 (这是我能想到的唯一解决方案,因为我想避免只为这个功能引入所有配置单元依赖项。)我会为我的表达式引入一个小语法,用 [=16= 标记原始列名] 以及 concatas 等函数的一些关键字。但是我怎么能这样做呢?我试过这样的东西,但它离编译还很远。

def columnsMapFunc( expr: String) : Column = {
    if(expr(0) == '$')
        return expr.drop(1)
    else
        return concat(extractedColumnNames).as(newName)
}

一般来说,使用包含空格的名称会带来问题,但用反引号替换方括号应该可以解决问题:

val df = sc.parallelize(Seq((1,"A"), (2, "B"))).toDF("f o o", "b a r")
df.registerTempTable("foo bar")

df.selectExpr("`f o o`").show

// +-----+
// |f o o|
// +-----+
// |    1|
// |    2|
// +-----+

sqlContext.sql("SELECT `b a r` FROM `foo bar`").show

// +-----+
// |b a r|
// +-----+
// |    A|
// |    B|
// +-----+

对于串联,您必须使用 concat 函数:

df.selectExpr("""concat(`f o o`, " ", `b a r`)""").show

// +----------------------+
// |'concat(f o o, ,b a r)|
// +----------------------+
// |                   1 A|
// |                   2 B|
// +----------------------+

但在 Spark 1.4.0 中需要 HiveContext

实际上我会在加载数据后简单地重命名列

df.toDF("foo", "bar")
// org.apache.spark.sql.DataFrame = [foo: int, bar: string]

并使用函数而不是表达式字符串(concat 函数仅在 Spark >= 1.5.0 中可用,对于 1.4 及更早版本,您需要一个 UDF):

import org.apache.spark.sql.functions.concat

df.select($"f o o", concat($"f o o", lit(" "), $"b a r")).show

// +----------------------+
// |'concat(f o o, ,b a r)|
// +----------------------+
// |                   1 A|
// |                   2 B|
// +----------------------+

还有一个concat_ws函数将分隔符作为第一个参数:

df.selectExpr("""concat_ws(" ", `f o o`, `b a r`)""")
df.select($"f o o", concat_ws(" ", $"f o o", $"b a r"))