groupBy Id并获取scala中多列的多条记录

groupBy Id and get multiple records for multiple columns in scala

我有一个 spark 数据框,如下所示。

val df = Seq(("a",1,1400),("a",1,1250),("a",2,1200),("a",4,1250),("a",4,1200),("a",4,1100),("b",2,2500),("b",2,1250),("b",2,500),("b",4,250),("b",4,200),("b",4,100),("b",4,100),("b",5,800)).
toDF("id","hierarchy","amount")

我正在使用 scala 语言来利用这个数据框并尝试获得如下所示的结果。

val df = Seq(("a",1,1400),("a",4,1250),("a",4,1200),("a",4,1100),("b",2,2500),("b",2,1250),("b",4,250),("b",4,200),("b",4,100),("b",5,800)).
toDF("id","hierarchy","amount")

规则:按id分组,如果min(hierarchy)==1那么我取最高数量的行然后我继续分析 hierarchy >= 4 并按 数量 的降序顺序取其中的 3 个。另一方面,如果 min(hierarchy)==2 那么我取两行最高 amount 然后我继续分析 hierarchy >= 4 并按 数量 的降序顺序每人取 3 个。依此类推数据中的所有 ID。

感谢您的建议..

您可以使用 window 函数生成您将根据其进行过滤的条件,例如

val results = df.withColumn("minh",min("hierarchy").over(Window.partitionBy("id")))
                .withColumn("rnk",rank().over(Window.partitionBy("id").orderBy(col("amount").desc())))
                .withColumn(
                    "rn4",
                    when(col("hierarchy")>=4, row_number().over(
                     Window.partitionBy("id",when(col("hierarchy")>=4,1).otherwise(0)).orderBy(col("amount").desc())
                     ) ).otherwise(5)
                )
                .filter("rnk <= minh or rn4 <=3")
                .select("id","hierarchy","amount")

注意。更详细的过滤器 .filter("(rnk <= minh or rn4 <=3) and (minh in (1,2))")

以上由 window 函数生成的辅助过滤条件的临时列是

  • minh :用于确定组 ID 的最小层次结构,随后 select 组中最上面的最小列数。
  • rnk用于确定每组中金额最高的行
  • rn4用于确定每个组中金额最高的行,层级>=4