如何根据 scala 中的 spark 条件获取 row_number()

How to take row_number() based on a condition in spark with scala

我有以下数据框 -

+----+-----+---+
| val|count| id|
+----+-----+---+
|   a|   10| m1|
|   b|   20| m1|
|null|   30| m1|
|   b|   30| m2|
|   c|   40| m2|
|null|   50| m2|
+----+-----+---+

创建者 -

 val df1=Seq(
 ("a","10","m1"),
 ("b","20","m1"),
 (null,"30","m1"),
 ("b","30","m2"),
 ("c","40","m2"),
 (null,"50","m2")
 )toDF("val","count","id")

我正在尝试借助 row_number() 和 window 函数进行排名,如下所示。

df1.withColumn("rannk_num", row_number() over Window.partitionBy("id").orderBy("count")).show
+----+-----+---+---------+
| val|count| id|rannk_num|
+----+-----+---+---------+
|   a|   10| m1|        1|
|   b|   20| m1|        2|
|null|   30| m1|        3|
|   b|   30| m2|        1|
|   c|   40| m2|        2|
|null|   50| m2|        3|
+----+-----+---+---------+

但我必须过滤那些列 - val 具有空值的记录。

预期输出--

+----+-----+---+---------+
| val|count| id|rannk_num|
+----+-----+---+---------+
|   a|   10| m1|        1|
|   b|   20| m1|        2|
|null|   30| m1|     NULL|
|   b|   30| m2|        1|
|   c|   40| m2|        2|
|null|   50| m2|     NULL|
+----+-----+---+---------+

想知道这是否可以通过最小的改变实现。 val 和 count 列也可以有 'n' 个值。

用空值过滤那些行,为它们分配一个空行号,并联合回原始数据框。

val df1=Seq(
 ("a","10","m1"),
 ("b","20","m1"),
 (null,"30","m1"),
 ("b","30","m2"),
 ("c","40","m2"),
 (null,"50","m2")
 ).toDF("val","count","id")

df1.filter("val is not null").withColumn(
    "rannk_num", row_number() over Window.partitionBy("id").orderBy("count")
).union(
    df1.filter("val is null").withColumn("rannk_num", lit(null))
).show
+----+-----+---+---------+
| val|count| id|rannk_num|
+----+-----+---+---------+
|   a|   10| m1|        1|
|   b|   20| m1|        2|
|   b|   30| m2|        1|
|   c|   40| m2|        2|
|null|   30| m1|     null|
|null|   50| m2|     null|
+----+-----+---+---------+