Pyspark - 过滤数据框并创建排名列

Pyspark - filter dataframe and create rank columns

我有一种情况,我想根据不同的条件在数据框中创建排名列,并将第一排名设置为 true,将其他排名设置为 false。下面是一个示例数据框:

Column1    Column2   Column3   Column4
ABC        X1        null      2016-08-21 11:31:08
ABC        X1        Test      2016-08-22 11:31:08
ABC        X1        null      2016-08-20 11:31:08
PQR        X1        Test      2016-08-23 11:31:08
PQR        X1        Test      2016-08-24 11:31:08
PQR        X1        null      2016-08-24 11:31:08

这里我想根据以下条件创建排名列:

Rank1: 为 Column2 为 X1 且 Column3 为空且按 Column4 排序的行计算 Column1 上的排名

Rank2: 为 Column2 为 X1 且 Column3 为测试的行计算 Column1 的排名并按 Column4 排序

所以预期的结果是:

Column1    Column2   Column3   Column4                Rank1    Rank2
ABC        X1        null      2016-08-21 11:31:08    2        null
ABC        X1        Test      2016-08-22 11:31:08    null     1
ABC        X1        null      2016-08-20 11:31:08    1        null
PQR        X1        Test      2016-08-23 11:31:08    null     1
PQR        X1        Test      2016-08-24 11:31:08    null     2
PQR        X1        null      2016-08-24 11:31:08    1        null

我尝试使用 when 来过滤掉数据,但排名不是从 1 开始的。

df = df.withColumn("Rank1", F.when((df.Column2 == 'X1') & (df.Column3.isNull()), rank().over(Window.partitionBy('Column1').orderBy('Column4')))

这确实给了我顺序,但顺序是随机的。我需要标记第一等级,所以知道它对我来说很重要。

我尝试的其他选项是在临时数据框中过滤数据并计算排名并将其连接回主数据框。但是数据帧的大小很大,并且要计算多列,因此会出现内存不足错误。非常感谢任何解决此问题的帮助。

您需要将条件添加到partitionby window的order by子句中。

这应该适合你:

condition_rank1 = (col("column2") == 'X1') & (col("column3").isNull())
condition_rank2 = (col("column2") == 'X1') & (col("column3") == 'Test')

w_rank1 = Window.partitionBy('column1').orderBy(*[when(condition_rank1, lit(1)).desc(), col("column4")])
w_rank2 = Window.partitionBy('column1').orderBy(*[when(condition_rank2, lit(1)).desc(), col("column4")])

df.withColumn("Rank1", when(condition_rank1, rank().over(w_rank1))) \
    .withColumn("Rank2", when(condition_rank2, rank().over(w_rank2))) \
    .show()