在 Spark 的 Case-When 语句中列出 SQL
List in the Case-When Statement in Spark SQL
我正在尝试按照 中的建议将数据帧从长转换为宽
但是,SQL 似乎将国家列表误解为 table 中的变量。下面是我从控制台看到的消息和上面的示例数据和代码link。有人知道如何解决这些问题吗?
Messages from the scala console:
scala> val myDF1 = sqlc2.sql(query)
org.apache.spark.sql.AnalysisException: cannot resolve 'US' given input columns >id, tag, value;
id tag value
1 US 50
1 UK 100
1 Can 125
2 US 75
2 UK 150
2 Can 175
and I want:
id US UK Can
1 50 100 125
2 75 150 175
I can create a list with the value I want to pivot and then create a string containing the sql query I need.
val countries = List("US", "UK", "Can")
val numCountries = countries.length - 1
var query = "select *, "
for (i <- 0 to numCountries-1) {
query += "case when tag = " + countries(i) + " then value else 0 end as " + countries(i) + ", "
}
query += "case when tag = " + countries.last + " then value else 0 end as " + countries.last + " from myTable"
myDataFrame.registerTempTable("myTable")
val myDF1 = sqlContext.sql(query)
国家/地区代码是文字,应该用引号引起来,否则 SQL 解析器会将这些视为列的名称:
val caseClause = countries.map(
x => s"""CASE WHEN tag = '$x' THEN value ELSE 0 END as $x"""
).mkString(", ")
val aggClause = countries.map(x => s"""SUM($x) AS $x""").mkString(", ")
val query = s"""
SELECT id, $aggClause
FROM (SELECT id, $caseClause FROM myTable) tmp
GROUP BY id"""
sqlContext.sql(query)
问题是为什么还要费心从头开始构建 SQL 字符串?
def genCase(x: String) = {
when($"tag" <=> lit(x), $"value").otherwise(0).alias(x)
}
def genAgg(f: Column => Column)(x: String) = f(col(x)).alias(x)
df
.select($"id" :: countries.map(genCase): _*)
.groupBy($"id")
.agg($"id".alias("dummy"), countries.map(genAgg(sum)): _*)
.drop("dummy")
我正在尝试按照
Messages from the scala console:
scala> val myDF1 = sqlc2.sql(query)
org.apache.spark.sql.AnalysisException: cannot resolve 'US' given input columns >id, tag, value;
id tag value
1 US 50
1 UK 100
1 Can 125
2 US 75
2 UK 150
2 Can 175
and I want:
id US UK Can
1 50 100 125
2 75 150 175
I can create a list with the value I want to pivot and then create a string containing the sql query I need.
val countries = List("US", "UK", "Can")
val numCountries = countries.length - 1
var query = "select *, "
for (i <- 0 to numCountries-1) {
query += "case when tag = " + countries(i) + " then value else 0 end as " + countries(i) + ", "
}
query += "case when tag = " + countries.last + " then value else 0 end as " + countries.last + " from myTable"
myDataFrame.registerTempTable("myTable")
val myDF1 = sqlContext.sql(query)
国家/地区代码是文字,应该用引号引起来,否则 SQL 解析器会将这些视为列的名称:
val caseClause = countries.map(
x => s"""CASE WHEN tag = '$x' THEN value ELSE 0 END as $x"""
).mkString(", ")
val aggClause = countries.map(x => s"""SUM($x) AS $x""").mkString(", ")
val query = s"""
SELECT id, $aggClause
FROM (SELECT id, $caseClause FROM myTable) tmp
GROUP BY id"""
sqlContext.sql(query)
问题是为什么还要费心从头开始构建 SQL 字符串?
def genCase(x: String) = {
when($"tag" <=> lit(x), $"value").otherwise(0).alias(x)
}
def genAgg(f: Column => Column)(x: String) = f(col(x)).alias(x)
df
.select($"id" :: countries.map(genCase): _*)
.groupBy($"id")
.agg($"id".alias("dummy"), countries.map(genAgg(sum)): _*)
.drop("dummy")