如何迭代分组行以在 spark 结构化流中生成多行?
How to iterate grouped rows to produce multiple rows in spark structured streaming?
我的输入数据集如下:
id operation value
1 null 1
1 discard 0
2 null 1
2 null 2
2 max 0
3 null 1
3 null 1
3 list 0
我想根据 "operation" 列对输入进行分组并生成行。
对于第1组,operation="discard",则输出为null,
对于第2组,操作="max",输出为:
2 null 2
对于第3组,操作="list",输出为:
3 null 1
3 null 1
所以最后的输出是这样的:
id operation value
2 null 2
3 null 1
3 null 1
有解决办法吗?
我知道有一个类似的问题how-to-iterate-grouped-data-in-spark
但与之相比的区别是:
- 我想为每个分组数据生成多行。可能的
以及如何?
- 我希望我的逻辑能够轻松扩展,以便将来添加更多操作。所以用户定义的聚合函数(又名 UDAF)是
唯一可能的解决方案?
更新 1:
感谢stack0114106,然后根据他的回答更详细,例如对于 id=1,operation="max",我想迭代所有 id=2 的项目,并找到最大值,而不是分配一个硬编码值,这就是为什么我想迭代每个中的行团体。下面是一个更新的例子:
输入:
scala> val df = Seq((0,null,1),(0,"discard",0),(1,null,1),(1,null,2),(1,"max",0),(2,null,1),(2,null,3),(2,"max",0),(3,null,1),(3,null,1),(3,"list",0)).toDF("id"
,"operation","value")
df: org.apache.spark.sql.DataFrame = [id: int, operation: string ... 1 more field]
scala> df.show(false)
+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|0 |null |1 |
|0 |discard |0 |
|1 |null |1 |
|1 |null |2 |
|1 |max |0 |
|2 |null |1 |
|2 |null |3 |
|2 |max |0 |
|3 |null |1 |
|3 |null |1 |
|3 |list |0 |
+---+---------+-----+
预期输出:
+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|1 |null |2 |
|2 |null |3 |
|3 |null |1 |
|3 |null |1 |
+---+---------+-----+
您可以对数据框使用 flatMap 操作,并根据您提到的条件生成所需的行。看看这个
scala> val df = Seq((1,null,1),(1,"discard",0),(2,null,1),(2,null,2),(2,"max",0),(3,null,1),(3,null,1),(3,"list",0)).toDF("id","operation","value")
df: org.apache.spark.sql.DataFrame = [id: int, operation: string ... 1 more field]
scala> df.show(false)
+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|1 |null |1 |
|1 |discard |0 |
|2 |null |1 |
|2 |null |2 |
|2 |max |0 |
|3 |null |1 |
|3 |null |1 |
|3 |list |0 |
+---+---------+-----+
scala> df.filter("operation is not null").flatMap( r=> { val x=r.getString(1); val s = x match { case "discard" => (0,0) case "max" => (1,2) case "list" => (2,1) } ; (0
until s._1).map( i => (r.getInt(0),null,s._2) ) }).show(false)
+---+----+---+
|_1 |_2 |_3 |
+---+----+---+
|2 |null|2 |
|3 |null|1 |
|3 |null|1 |
+---+----+---+
Spark 分配 _1、_2 等。因此您可以通过如下分配将它们映射到实际名称
scala> val df2 = df.filter("operation is not null").flatMap( r=> { val x=r.getString(1); val s = x match { case "discard" => (0,0) case "max" => (1,2) case "list" => (2,1) } ; (0 until s._1).map( i => (r.getInt(0),null,s._2) ) }).toDF("id","operation","value")
df2: org.apache.spark.sql.DataFrame = [id: int, operation: null ... 1 more field]
scala> df2.show(false)
+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|2 |null |2 |
|3 |null |1 |
|3 |null |1 |
+---+---------+-----+
scala>
编辑 1:
由于您需要每个 id 的最大值(值),您可以使用 window 函数并在新列中获取最大值,然后使用相同的技术并获得结果。看看这个
scala> val df = Seq((0,null,1),(0,"discard",0),(1,null,1),(1,null,2),(1,"max",0),(2,null,1),(2,null,3),(2,"max",0),(3,null,1),(3,null,1),(3,"list",0)).toDF("id","operation","value")
df: org.apache.spark.sql.DataFrame = [id: int, operation: string ... 1 more field]
scala> df.createOrReplaceTempView("michael")
scala> val df2 = spark.sql(""" select *, max(value) over(partition by id) mx from michael """)
df2: org.apache.spark.sql.DataFrame = [id: int, operation: string ... 2 more fields]
scala> df2.show(false)
+---+---------+-----+---+
|id |operation|value|mx |
+---+---------+-----+---+
|1 |null |1 |2 |
|1 |null |2 |2 |
|1 |max |0 |2 |
|3 |null |1 |1 |
|3 |null |1 |1 |
|3 |list |0 |1 |
|2 |null |1 |3 |
|2 |null |3 |3 |
|2 |max |0 |3 |
|0 |null |1 |1 |
|0 |discard |0 |1 |
+---+---------+-----+---+
scala> val df3 = df2.filter("operation is not null").flatMap( r=> { val x=r.getString(1); val s = x match { case "discard" => 0 case "max" => 1 case "list" => 2 } ; (0 until s).map( i => (r.getInt(0),null,r.getInt(3) )) }).toDF("id","operation","value")
df3: org.apache.spark.sql.DataFrame = [id: int, operation: null ... 1 more field]
scala> df3.show(false)
+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|1 |null |2 |
|3 |null |1 |
|3 |null |1 |
|2 |null |3 |
+---+---------+-----+
scala>
将收集值的所有内容分组,然后为每个操作编写逻辑:
import org.apache.spark.sql.functions._
val grouped=df.groupBy($"id").agg(max($"operation").as("op"),collect_list($"value").as("vals"))
val maxs=grouped.filter($"op"==="max").withColumn("val",explode($"vals")).groupBy($"id").agg(max("val").as("value"))
val lists=grouped.filter($"op"==="list").withColumn("value",explode($"vals")).filter($"value"!==0).select($"id",$"value")
//we don't collect the "discard"
//and we can add additional subsets for new "operations"
val result=maxs.union(lists)
//if you need the null in "operation" column add it with withColumn
我的输入数据集如下:
id operation value
1 null 1
1 discard 0
2 null 1
2 null 2
2 max 0
3 null 1
3 null 1
3 list 0
我想根据 "operation" 列对输入进行分组并生成行。
对于第1组,operation="discard",则输出为null,
对于第2组,操作="max",输出为:
2 null 2
对于第3组,操作="list",输出为:
3 null 1
3 null 1
所以最后的输出是这样的:
id operation value
2 null 2
3 null 1
3 null 1
有解决办法吗?
我知道有一个类似的问题how-to-iterate-grouped-data-in-spark 但与之相比的区别是:
- 我想为每个分组数据生成多行。可能的 以及如何?
- 我希望我的逻辑能够轻松扩展,以便将来添加更多操作。所以用户定义的聚合函数(又名 UDAF)是 唯一可能的解决方案?
更新 1:
感谢stack0114106,然后根据他的回答更详细,例如对于 id=1,operation="max",我想迭代所有 id=2 的项目,并找到最大值,而不是分配一个硬编码值,这就是为什么我想迭代每个中的行团体。下面是一个更新的例子:
输入:
scala> val df = Seq((0,null,1),(0,"discard",0),(1,null,1),(1,null,2),(1,"max",0),(2,null,1),(2,null,3),(2,"max",0),(3,null,1),(3,null,1),(3,"list",0)).toDF("id"
,"operation","value")
df: org.apache.spark.sql.DataFrame = [id: int, operation: string ... 1 more field]
scala> df.show(false)
+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|0 |null |1 |
|0 |discard |0 |
|1 |null |1 |
|1 |null |2 |
|1 |max |0 |
|2 |null |1 |
|2 |null |3 |
|2 |max |0 |
|3 |null |1 |
|3 |null |1 |
|3 |list |0 |
+---+---------+-----+
预期输出:
+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|1 |null |2 |
|2 |null |3 |
|3 |null |1 |
|3 |null |1 |
+---+---------+-----+
您可以对数据框使用 flatMap 操作,并根据您提到的条件生成所需的行。看看这个
scala> val df = Seq((1,null,1),(1,"discard",0),(2,null,1),(2,null,2),(2,"max",0),(3,null,1),(3,null,1),(3,"list",0)).toDF("id","operation","value")
df: org.apache.spark.sql.DataFrame = [id: int, operation: string ... 1 more field]
scala> df.show(false)
+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|1 |null |1 |
|1 |discard |0 |
|2 |null |1 |
|2 |null |2 |
|2 |max |0 |
|3 |null |1 |
|3 |null |1 |
|3 |list |0 |
+---+---------+-----+
scala> df.filter("operation is not null").flatMap( r=> { val x=r.getString(1); val s = x match { case "discard" => (0,0) case "max" => (1,2) case "list" => (2,1) } ; (0
until s._1).map( i => (r.getInt(0),null,s._2) ) }).show(false)
+---+----+---+
|_1 |_2 |_3 |
+---+----+---+
|2 |null|2 |
|3 |null|1 |
|3 |null|1 |
+---+----+---+
Spark 分配 _1、_2 等。因此您可以通过如下分配将它们映射到实际名称
scala> val df2 = df.filter("operation is not null").flatMap( r=> { val x=r.getString(1); val s = x match { case "discard" => (0,0) case "max" => (1,2) case "list" => (2,1) } ; (0 until s._1).map( i => (r.getInt(0),null,s._2) ) }).toDF("id","operation","value")
df2: org.apache.spark.sql.DataFrame = [id: int, operation: null ... 1 more field]
scala> df2.show(false)
+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|2 |null |2 |
|3 |null |1 |
|3 |null |1 |
+---+---------+-----+
scala>
编辑 1:
由于您需要每个 id 的最大值(值),您可以使用 window 函数并在新列中获取最大值,然后使用相同的技术并获得结果。看看这个
scala> val df = Seq((0,null,1),(0,"discard",0),(1,null,1),(1,null,2),(1,"max",0),(2,null,1),(2,null,3),(2,"max",0),(3,null,1),(3,null,1),(3,"list",0)).toDF("id","operation","value")
df: org.apache.spark.sql.DataFrame = [id: int, operation: string ... 1 more field]
scala> df.createOrReplaceTempView("michael")
scala> val df2 = spark.sql(""" select *, max(value) over(partition by id) mx from michael """)
df2: org.apache.spark.sql.DataFrame = [id: int, operation: string ... 2 more fields]
scala> df2.show(false)
+---+---------+-----+---+
|id |operation|value|mx |
+---+---------+-----+---+
|1 |null |1 |2 |
|1 |null |2 |2 |
|1 |max |0 |2 |
|3 |null |1 |1 |
|3 |null |1 |1 |
|3 |list |0 |1 |
|2 |null |1 |3 |
|2 |null |3 |3 |
|2 |max |0 |3 |
|0 |null |1 |1 |
|0 |discard |0 |1 |
+---+---------+-----+---+
scala> val df3 = df2.filter("operation is not null").flatMap( r=> { val x=r.getString(1); val s = x match { case "discard" => 0 case "max" => 1 case "list" => 2 } ; (0 until s).map( i => (r.getInt(0),null,r.getInt(3) )) }).toDF("id","operation","value")
df3: org.apache.spark.sql.DataFrame = [id: int, operation: null ... 1 more field]
scala> df3.show(false)
+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|1 |null |2 |
|3 |null |1 |
|3 |null |1 |
|2 |null |3 |
+---+---------+-----+
scala>
将收集值的所有内容分组,然后为每个操作编写逻辑:
import org.apache.spark.sql.functions._
val grouped=df.groupBy($"id").agg(max($"operation").as("op"),collect_list($"value").as("vals"))
val maxs=grouped.filter($"op"==="max").withColumn("val",explode($"vals")).groupBy($"id").agg(max("val").as("value"))
val lists=grouped.filter($"op"==="list").withColumn("value",explode($"vals")).filter($"value"!==0).select($"id",$"value")
//we don't collect the "discard"
//and we can add additional subsets for new "operations"
val result=maxs.union(lists)
//if you need the null in "operation" column add it with withColumn