groupBy Id并获取scala中多列的多条记录
groupBy Id and get multiple records for multiple columns in scala
我有一个 spark 数据框,如下所示。
val df = Seq(("a",1,1400),("a",1,1250),("a",2,1200),("a",4,1250),("a",4,1200),("a",4,1100),("b",2,2500),("b",2,1250),("b",2,500),("b",4,250),("b",4,200),("b",4,100),("b",4,100),("b",5,800)).
toDF("id","hierarchy","amount")
我正在使用 scala 语言来利用这个数据框并尝试获得如下所示的结果。
val df = Seq(("a",1,1400),("a",4,1250),("a",4,1200),("a",4,1100),("b",2,2500),("b",2,1250),("b",4,250),("b",4,200),("b",4,100),("b",5,800)).
toDF("id","hierarchy","amount")
规则:按id分组,如果min(hierarchy)==1那么我取最高数量的行然后我继续分析 hierarchy >= 4 并按 数量 的降序顺序取其中的 3 个。另一方面,如果 min(hierarchy)==2 那么我取两行最高 amount 然后我继续分析 hierarchy >= 4 并按 数量 的降序顺序每人取 3 个。依此类推数据中的所有 ID。
感谢您的建议..
您可以使用 window 函数生成您将根据其进行过滤的条件,例如
val results = df.withColumn("minh",min("hierarchy").over(Window.partitionBy("id")))
.withColumn("rnk",rank().over(Window.partitionBy("id").orderBy(col("amount").desc())))
.withColumn(
"rn4",
when(col("hierarchy")>=4, row_number().over(
Window.partitionBy("id",when(col("hierarchy")>=4,1).otherwise(0)).orderBy(col("amount").desc())
) ).otherwise(5)
)
.filter("rnk <= minh or rn4 <=3")
.select("id","hierarchy","amount")
注意。更详细的过滤器 .filter("(rnk <= minh or rn4 <=3) and (minh in (1,2))")
以上由 window 函数生成的辅助过滤条件的临时列是
minh
:用于确定组 ID 的最小层次结构,随后 select 组中最上面的最小列数。
rnk
用于确定每组中金额最高的行
rn4
用于确定每个组中金额最高的行,层级>=4
我有一个 spark 数据框,如下所示。
val df = Seq(("a",1,1400),("a",1,1250),("a",2,1200),("a",4,1250),("a",4,1200),("a",4,1100),("b",2,2500),("b",2,1250),("b",2,500),("b",4,250),("b",4,200),("b",4,100),("b",4,100),("b",5,800)).
toDF("id","hierarchy","amount")
我正在使用 scala 语言来利用这个数据框并尝试获得如下所示的结果。
val df = Seq(("a",1,1400),("a",4,1250),("a",4,1200),("a",4,1100),("b",2,2500),("b",2,1250),("b",4,250),("b",4,200),("b",4,100),("b",5,800)).
toDF("id","hierarchy","amount")
规则:按id分组,如果min(hierarchy)==1那么我取最高数量的行然后我继续分析 hierarchy >= 4 并按 数量 的降序顺序取其中的 3 个。另一方面,如果 min(hierarchy)==2 那么我取两行最高 amount 然后我继续分析 hierarchy >= 4 并按 数量 的降序顺序每人取 3 个。依此类推数据中的所有 ID。
感谢您的建议..
您可以使用 window 函数生成您将根据其进行过滤的条件,例如
val results = df.withColumn("minh",min("hierarchy").over(Window.partitionBy("id")))
.withColumn("rnk",rank().over(Window.partitionBy("id").orderBy(col("amount").desc())))
.withColumn(
"rn4",
when(col("hierarchy")>=4, row_number().over(
Window.partitionBy("id",when(col("hierarchy")>=4,1).otherwise(0)).orderBy(col("amount").desc())
) ).otherwise(5)
)
.filter("rnk <= minh or rn4 <=3")
.select("id","hierarchy","amount")
注意。更详细的过滤器 .filter("(rnk <= minh or rn4 <=3) and (minh in (1,2))")
以上由 window 函数生成的辅助过滤条件的临时列是
minh
:用于确定组 ID 的最小层次结构,随后 select 组中最上面的最小列数。rnk
用于确定每组中金额最高的行rn4
用于确定每个组中金额最高的行,层级>=4