Pyspark - 排名列保持联系

Pyspark - Ranking columns keeping ties

我正在寻找一种方法来对保留联系的数据框的列进行排名。专门针对此示例,我有一个 pyspark 数据框,如下所示,我想在其中为 colA 和 colB 生成排名(尽管我想支持能够对 N 列进行排名)

 +--------+----------+-----+----+
 |  Entity|        id| colA|colB|
 +-------------------+-----+----+
 |       a|8589934652|   21|  50|
 |       b|       112|    9|  23|
 |       c|8589934629|    9|  23|
 |       d|8589934702|    8|  21|         
 |       e|        20|    2|  21|        
 |       f|8589934657|    2|   5|          
 |       g|8589934601|    1|   5|         
 |       h|8589934653|    1|   4|          
 |       i|8589934620|    0|   4|          
 |       j|8589934643|    0|   3|         
 |       k|8589934618|    0|   3|         
 |       l|8589934602|    0|   2|         
 |       m|8589934664|    0|   2|         
 |       n|        25|    0|   1|         
 |       o|        67|    0|   1|         
 |       p|8589934642|    0|   1|         
 |       q|8589934709|    0|   1|         
 |       r|8589934660|    0|   1|         
 |       s|        30|    0|   1|         
 |       t|        55|    0|   1|         
 +--------+----------+-----+----+

我想要的是一种对该数据框进行排名的方法,其中绑定值获得相同的排名,例如:

 +--------+----------+-----+----+---------+---------+
 |  Entity|        id| colA|colB|colA_rank|colB_rank|
 +-------------------+-----+----+---------+---------+
 |       a|8589934652|   21|  50|        1|        1|
 |       b|       112|    9|  23|        2|        2|
 |       c|8589934629|    9|  21|        2|        3|
 |       d|8589934702|    8|  21|        3|        3|        
 |       e|        20|    2|  21|        4|        3|      
 |       f|8589934657|    2|   5|        4|        4|       
 |       g|8589934601|    1|   5|        5|        4|     
 |       h|8589934653|    1|   4|        5|        5|     
 |       i|8589934620|    0|   4|        6|        5|    
 |       j|8589934643|    0|   3|        6|        6|  
 |       k|8589934618|    0|   3|        6|        6| 
 |       l|8589934602|    0|   2|        6|        7|
 |       m|8589934664|    0|   2|        6|        7|
 |       n|        25|    0|   1|        6|        8|
 |       o|        67|    0|   1|        6|        8|
 |       p|8589934642|    0|   1|        6|        8|
 |       q|8589934709|    0|   1|        6|        8|
 |       r|8589934660|    0|   1|        6|        8|
 |       s|        30|    0|   1|        6|        8|
 |       t|        55|    0|   1|        6|        8|
 +--------+----------+-----+----+---------+---------+

我当前对第一个数据帧的实现如下所示:

 def getRanks(mydf, cols=None, ascending=False):
     from pyspark import Row
     # This takes a dataframe and a list of columns to rank
     # If no list is provided, it ranks *all* columns
     # returns a new dataframe

     def addRank(ranked_rdd, col, ascending):
         # This assumes an RDD of the form (Row(...), list[...])
         # it orders the rdd by col, finds the order, then adds that to the 
         # list
         myrdd = ranked_rdd.sortBy(lambda (row, ranks):  row[col], 
                 ascending=ascending).zipWithIndex()
         return myrdd.map(lambda ((row, ranks), index): (row, ranks + 
                [index+1]))

     myrdd = mydf.rdd
     fields = myrdd.first().__fields__
     ranked_rdd = myrdd.map(lambda x: (x, []))

     if (cols is None):
         cols = fields
     for col in cols:
         ranked_rdd = addRank(ranked_rdd, col, ascending)
     rank_names = [x + "_rank" for x in cols]

     # Hack to make sure columns come back in the right order
     ranked_rdd = ranked_rdd.map(lambda (row, ranks): Row(*row.__fields__ + 
                  rank_names)(*row + tuple(ranks)))
     return ranked_rdd.toDF()

产生:

 +--------+----------+-----+----+---------+---------+
 |  Entity|        id| colA|colB|colA_rank|colB_rank|
 +-------------------+-----+----+---------+---------+
 |       a|8589934652|   21|  50|        1|        1|
 |       b|       112|    9|  23|        2|        2|
 |       c|8589934629|    9|  23|        3|        3|
 |       d|8589934702|    8|  21|        4|        4|        
 |       e|        20|    2|  21|        5|        5|      
 |       f|8589934657|    2|   5|        6|        6|       
 |       g|8589934601|    1|   5|        7|        7|     
 |       h|8589934653|    1|   4|        8|        8|     
 |       i|8589934620|    0|   4|        9|        9|    
 |       j|8589934643|    0|   3|       10|       10|  
 |       k|8589934618|    0|   3|       11|       11|
 |       l|8589934602|    0|   2|       12|       12|
 |       m|8589934664|    0|   2|       13|       13|
 |       n|        25|    0|   1|       14|       14|
 |       o|        67|    0|   1|       15|       15|
 |       p|8589934642|    0|   1|       16|       16|
 |       q|8589934709|    0|   1|       17|       17|
 |       r|8589934660|    0|   1|       18|       18|
 |       s|        30|    0|   1|       19|       19|
 |       t|        55|    0|   1|       20|       20|
 +--------+----------+-----+----+---------+---------+

如您所见,函数 getRanks() 获取一个数据框,指定要排名的列,对它们进行排序,并使用 zipWithIndex() 生成排序或排名。但是,我想不出保持关系的方法。

这个 Whosebug post 是我找到的最接近的解决方案: 但它似乎只能处理 1 列(我认为)。

非常感谢您的提前帮助!

编辑:列 'id' 是通过调用 monotonically_increasing_id() 生成的,在我的实现中被转换为字符串。

您正在寻找 dense_rank

首先让我们创建我们的数据框:

df = spark.createDataFrame(sc.parallelize([["a",8589934652,21,50],["b",112,9,23],["c",8589934629,9,23],
                ["d",8589934702,8,21],["e",20,2,21],["f",8589934657,2,5],
                ["g",8589934601,1,5],["h",8589934653,1,4],["i",8589934620,0,4],
                ["j",8589934643,0,3],["k",8589934618,0,3],["l",8589934602,0,2],
                ["m",8589934664,0,2],["n",25,0,1],["o",67,0,1],["p",8589934642,0,1],
                ["q",8589934709,0,1],["r",8589934660,0,1],["s",30,0,1],["t",55,0,1]]
), ["Entity","id","colA","colB"])

我们将定义两个 windowSpec:

from pyspark.sql import Window
import pyspark.sql.functions as psf
wA = Window.orderBy(psf.desc("colA"))
wB = Window.orderBy(psf.desc("colB"))
df = df.withColumn(
    "colA_rank", 
    psf.dense_rank().over(wA)
).withColumn(
    "colB_rank", 
    psf.dense_rank().over(wB)
)

    +------+----------+----+----+---------+---------+
    |Entity|        id|colA|colB|colA_rank|colB_rank|
    +------+----------+----+----+---------+---------+
    |     a|8589934652|  21|  50|        1|        1|
    |     b|       112|   9|  23|        2|        2|
    |     c|8589934629|   9|  23|        2|        2|
    |     d|8589934702|   8|  21|        3|        3|
    |     e|        20|   2|  21|        4|        3|
    |     f|8589934657|   2|   5|        4|        4|
    |     g|8589934601|   1|   5|        5|        4|
    |     h|8589934653|   1|   4|        5|        5|
    |     i|8589934620|   0|   4|        6|        5|
    |     j|8589934643|   0|   3|        6|        6|
    |     k|8589934618|   0|   3|        6|        6|
    |     l|8589934602|   0|   2|        6|        7|
    |     m|8589934664|   0|   2|        6|        7|
    |     n|        25|   0|   1|        6|        8|
    |     o|        67|   0|   1|        6|        8|
    |     p|8589934642|   0|   1|        6|        8|
    |     q|8589934709|   0|   1|        6|        8|
    |     r|8589934660|   0|   1|        6|        8|
    |     s|        30|   0|   1|        6|        8|
    |     t|        55|   0|   1|        6|        8|
    +------+----------+----+----+---------+---------+

我也会提出一个替代方案:

 for cols in data.columns[2:]:
     lookup = (data.select(cols)
          .distinct()
          .orderBy(cols, ascending=False)
          .rdd
          .zipWithIndex()
          .map(lambda x: x[0] + (x[1], ))
          .toDF([cols, cols+"_rank_lookup"]))

     name = cols + "_ranks"
     data = data.join(lookup, [cols]).withColumn(name,col(cols+"_rank_lookup") 
            + 1).drop(cols + "_rank_lookup")

不如 dense_rank() 优雅,我不确定对性能的影响。