在 Spark SQL table 中分解(转置?)多列
Explode (transpose?) multiple columns in Spark SQL table
我正在使用 Spark SQL(我提到它在 Spark 中以防影响 SQL 语法 - 我还不够熟悉,无法确定)并且我有一个 table 我正在尝试重新构建,但我在尝试同时转置多列时遇到困难。
基本上我的数据如下:
userId someString varA varB
1 "example1" [0,2,5] [1,2,9]
2 "example2" [1,20,5] [9,null,6]
并且我想同时展开 varA 和 varB(长度将始终保持一致)- 这样最终输出如下所示:
userId someString varA varB
1 "example1" 0 1
1 "example1" 2 2
1 "example1" 5 9
2 "example2" 1 9
2 "example2" 20 null
2 "example2" 5 6
但我似乎只能在一个命令中使用一个 explode(var) 语句,如果我尝试链接它们(即在第一个 explode 命令之后创建一个临时 table),那么我显然得到了大量重复的、不必要的行。
非常感谢!
Spark >= 2.4
您可以跳过 zip
udf
并使用 arrays_zip
功能:
df.withColumn("vars", explode(arrays_zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars.varA", $"vars.varB").show
Spark < 2.4
没有自定义 UDF,您想要的是不可能的。在 Scala 中你可以这样做:
val data = sc.parallelize(Seq(
"""{"userId": 1, "someString": "example1",
"varA": [0, 2, 5], "varB": [1, 2, 9]}""",
"""{"userId": 2, "someString": "example2",
"varA": [1, 20, 5], "varB": [9, null, 6]}"""
))
val df = spark.read.json(data)
df.printSchema
// root
// |-- someString: string (nullable = true)
// |-- userId: long (nullable = true)
// |-- varA: array (nullable = true)
// | |-- element: long (containsNull = true)
// |-- varB: array (nullable = true)
// | |-- element: long (containsNull = true)
现在我们可以定义zip
udf:
import org.apache.spark.sql.functions.{udf, explode}
val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars._1".alias("varA"), $"vars._2".alias("varB")).show
// +------+----------+----+----+
// |userId|someString|varA|varB|
// +------+----------+----+----+
// | 1| example1| 0| 1|
// | 1| example1| 2| 2|
// | 1| example1| 5| 9|
// | 2| example2| 1| 9|
// | 2| example2| 20|null|
// | 2| example2| 5| 6|
// +------+----------+----+----+
原始 SQL:
sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
df.registerTempTable("df")
sqlContext.sql(
"""SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""")
你也可以试试
case class Input(
userId: Integer,
someString: String,
varA: Array[Integer],
varB: Array[Integer])
case class Result(
userId: Integer,
someString: String,
varA: Integer,
varB: Integer)
def getResult(row : Input) : Iterable[Result] = {
val user_id = row.user_id
val someString = row.someString
val varA = row.varA
val varB = row.varB
val seq = for( i <- 0 until varA.size) yield {Result(user_id,someString,varA(i),varB(i))}
seq
}
val obj1 = Input(1, "string1", Array(0, 2, 5), Array(1, 2, 9))
val obj2 = Input(2, "string2", Array(1, 3, 6), Array(2, 3, 10))
val input_df = sc.parallelize(Seq(obj1, obj2)).toDS
val res = input_df.flatMap{ row => getResult(row) }
res.show
// +------+----------+----+-----+
// |userId|someString|varA|varB |
// +------+----------+----+-----+
// | 1| string1 | 0| 1 |
// | 1| string1 | 2| 2 |
// | 1| string1 | 5| 9 |
// | 2| string2 | 1| 2 |
// | 2| string2 | 3| 3 |
// | 2| string2 | 6| 10|
// +------+----------+----+-----+
即使我们有超过 3 列,这仍然有效
case class Input(user_id: Integer, someString: String, varA: Array[Integer], varB: Array[Integer], varC: Array[String], varD: Array[String])
val obj1 = Input(1, "example1", Array(0,2,5), Array(1,2,9), Array("a","b","c"), Array("red","green","yellow"))
val obj2 = Input(2, "example2", Array(1,20,5), Array(9,null,6), Array("d","e","f"), Array("white","black","cyan"))
val obj3 = Input(3, "example3", Array(10,11,12), Array(5,8,7), Array("g","h","i"), Array("blue","pink","brown"))
val input_df = sc.parallelize(Seq(obj1, obj2, obj3)).toDS
input_df.show()
val zip = udf((a: Seq[String], b: Seq[String], c: Seq[String], d: Seq[String]) => {a.indices.map(i=> (a(i), b(i), c(i), d(i)))})
val output_df = input_df.withColumn("vars", explode(zip($"varA", $"varB", $"varC", $"varD"))).
select($"user_id", $"someString", $"vars._1".alias("varA"), $"vars._2".alias("varB"), $"vars._3".alias("varC"), $"vars._4".alias("varD"))
output_df.show()
我正在使用 Spark SQL(我提到它在 Spark 中以防影响 SQL 语法 - 我还不够熟悉,无法确定)并且我有一个 table 我正在尝试重新构建,但我在尝试同时转置多列时遇到困难。
基本上我的数据如下:
userId someString varA varB
1 "example1" [0,2,5] [1,2,9]
2 "example2" [1,20,5] [9,null,6]
并且我想同时展开 varA 和 varB(长度将始终保持一致)- 这样最终输出如下所示:
userId someString varA varB
1 "example1" 0 1
1 "example1" 2 2
1 "example1" 5 9
2 "example2" 1 9
2 "example2" 20 null
2 "example2" 5 6
但我似乎只能在一个命令中使用一个 explode(var) 语句,如果我尝试链接它们(即在第一个 explode 命令之后创建一个临时 table),那么我显然得到了大量重复的、不必要的行。
非常感谢!
Spark >= 2.4
您可以跳过 zip
udf
并使用 arrays_zip
功能:
df.withColumn("vars", explode(arrays_zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars.varA", $"vars.varB").show
Spark < 2.4
没有自定义 UDF,您想要的是不可能的。在 Scala 中你可以这样做:
val data = sc.parallelize(Seq(
"""{"userId": 1, "someString": "example1",
"varA": [0, 2, 5], "varB": [1, 2, 9]}""",
"""{"userId": 2, "someString": "example2",
"varA": [1, 20, 5], "varB": [9, null, 6]}"""
))
val df = spark.read.json(data)
df.printSchema
// root
// |-- someString: string (nullable = true)
// |-- userId: long (nullable = true)
// |-- varA: array (nullable = true)
// | |-- element: long (containsNull = true)
// |-- varB: array (nullable = true)
// | |-- element: long (containsNull = true)
现在我们可以定义zip
udf:
import org.apache.spark.sql.functions.{udf, explode}
val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars._1".alias("varA"), $"vars._2".alias("varB")).show
// +------+----------+----+----+
// |userId|someString|varA|varB|
// +------+----------+----+----+
// | 1| example1| 0| 1|
// | 1| example1| 2| 2|
// | 1| example1| 5| 9|
// | 2| example2| 1| 9|
// | 2| example2| 20|null|
// | 2| example2| 5| 6|
// +------+----------+----+----+
原始 SQL:
sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
df.registerTempTable("df")
sqlContext.sql(
"""SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""")
你也可以试试
case class Input(
userId: Integer,
someString: String,
varA: Array[Integer],
varB: Array[Integer])
case class Result(
userId: Integer,
someString: String,
varA: Integer,
varB: Integer)
def getResult(row : Input) : Iterable[Result] = {
val user_id = row.user_id
val someString = row.someString
val varA = row.varA
val varB = row.varB
val seq = for( i <- 0 until varA.size) yield {Result(user_id,someString,varA(i),varB(i))}
seq
}
val obj1 = Input(1, "string1", Array(0, 2, 5), Array(1, 2, 9))
val obj2 = Input(2, "string2", Array(1, 3, 6), Array(2, 3, 10))
val input_df = sc.parallelize(Seq(obj1, obj2)).toDS
val res = input_df.flatMap{ row => getResult(row) }
res.show
// +------+----------+----+-----+
// |userId|someString|varA|varB |
// +------+----------+----+-----+
// | 1| string1 | 0| 1 |
// | 1| string1 | 2| 2 |
// | 1| string1 | 5| 9 |
// | 2| string2 | 1| 2 |
// | 2| string2 | 3| 3 |
// | 2| string2 | 6| 10|
// +------+----------+----+-----+
即使我们有超过 3 列,这仍然有效
case class Input(user_id: Integer, someString: String, varA: Array[Integer], varB: Array[Integer], varC: Array[String], varD: Array[String])
val obj1 = Input(1, "example1", Array(0,2,5), Array(1,2,9), Array("a","b","c"), Array("red","green","yellow"))
val obj2 = Input(2, "example2", Array(1,20,5), Array(9,null,6), Array("d","e","f"), Array("white","black","cyan"))
val obj3 = Input(3, "example3", Array(10,11,12), Array(5,8,7), Array("g","h","i"), Array("blue","pink","brown"))
val input_df = sc.parallelize(Seq(obj1, obj2, obj3)).toDS
input_df.show()
val zip = udf((a: Seq[String], b: Seq[String], c: Seq[String], d: Seq[String]) => {a.indices.map(i=> (a(i), b(i), c(i), d(i)))})
val output_df = input_df.withColumn("vars", explode(zip($"varA", $"varB", $"varC", $"varD"))).
select($"user_id", $"someString", $"vars._1".alias("varA"), $"vars._2".alias("varB"), $"vars._3".alias("varC"), $"vars._4".alias("varD"))
output_df.show()