如何根据某些条件为 spark 数据框中的记录分配等级?
How to assign ranks to records in a spark dataframe based on some conditions?
给定一个数据框:
+-------+-------+
| A | B |
+-------+-------+
| a| 1|
+-------+-------+
| b| 2|
+-------+-------+
| c| 5|
+-------+-------+
| d| 7|
+-------+-------+
| e| 11|
+-------+-------+
我想根据条件为记录分配排名:
- 排名从 1 开始
- 如果(当前记录的 B - 先前记录的 B)<= 2,则分配排名 = 先前记录的排名
- 当(当前记录的 B - 先前记录的 B)> 2 时递增排名
所以我希望结果是这样的:
+-------+-------+------+
| A | B | rank |
+-------+-------+------+
| a| 1| 1|
+-------+-------+------+
| b| 2| 1|
+-------+-------+------+
| c| 5| 2|
+-------+-------+------+
| d| 7| 2|
+-------+-------+------+
| e| 11| 3|
+-------+-------+------+
- spark 中的内置函数,如 rowNumber、rank,dense_rank 没有
提供实现此目的的任何功能。
- 我尝试通过使用全局变量排名和获取来做到这一点
使用滞后函数的先前记录值,但它不给出
与 sql.
不同,spark 中的分布式处理导致结果一致
我尝试的另一种方法是将记录的滞后值传递给 UDF,同时生成新列并在 UDF 中应用条件。但我面临的问题是我可以获得 A 列和 B 列的滞后值,但不能获得列等级的滞后值。
这会产生错误,因为它无法解析列名等级:
HiveContext.sql("SELECT df.*,LAG(df.rank, 1) OVER (ORDER BY B , 0) AS rank_lag, udfGetVisitNo(B,rank_lag) as rank FROM df")
我无法获得我当前添加的列的滞后值。
此外,我不想要需要使用 df.collect() 的方法,因为此数据帧的大小非常大,将其收集在单个工作节点上会导致内存错误。
还有其他方法可以达到同样的效果吗?
我想知道一个时间复杂度为 O(n) 的解决方案,n 是记录数。
一个 SQL 的解决方案是
select a,b,1+sum(col) over(order by a) as rnk
from
(
select t.*
,case when b - lag(b,1,b) over(order by a) <= 2 then 0 else 1 end as col
from t
) x
解决方案假定排序基于列 a
。
给定一个数据框:
+-------+-------+
| A | B |
+-------+-------+
| a| 1|
+-------+-------+
| b| 2|
+-------+-------+
| c| 5|
+-------+-------+
| d| 7|
+-------+-------+
| e| 11|
+-------+-------+
我想根据条件为记录分配排名:
- 排名从 1 开始
- 如果(当前记录的 B - 先前记录的 B)<= 2,则分配排名 = 先前记录的排名
- 当(当前记录的 B - 先前记录的 B)> 2 时递增排名
所以我希望结果是这样的:
+-------+-------+------+
| A | B | rank |
+-------+-------+------+
| a| 1| 1|
+-------+-------+------+
| b| 2| 1|
+-------+-------+------+
| c| 5| 2|
+-------+-------+------+
| d| 7| 2|
+-------+-------+------+
| e| 11| 3|
+-------+-------+------+
- spark 中的内置函数,如 rowNumber、rank,dense_rank 没有 提供实现此目的的任何功能。
- 我尝试通过使用全局变量排名和获取来做到这一点 使用滞后函数的先前记录值,但它不给出 与 sql. 不同,spark 中的分布式处理导致结果一致
我尝试的另一种方法是将记录的滞后值传递给 UDF,同时生成新列并在 UDF 中应用条件。但我面临的问题是我可以获得 A 列和 B 列的滞后值,但不能获得列等级的滞后值。 这会产生错误,因为它无法解析列名等级:
HiveContext.sql("SELECT df.*,LAG(df.rank, 1) OVER (ORDER BY B , 0) AS rank_lag, udfGetVisitNo(B,rank_lag) as rank FROM df")
我无法获得我当前添加的列的滞后值。
此外,我不想要需要使用 df.collect() 的方法,因为此数据帧的大小非常大,将其收集在单个工作节点上会导致内存错误。
还有其他方法可以达到同样的效果吗? 我想知道一个时间复杂度为 O(n) 的解决方案,n 是记录数。
一个 SQL 的解决方案是
select a,b,1+sum(col) over(order by a) as rnk
from
(
select t.*
,case when b - lag(b,1,b) over(order by a) <= 2 then 0 else 1 end as col
from t
) x
解决方案假定排序基于列 a
。