如何比较两个数据框并在 pyspark 中添加新的标志列?

How to compare two dataframes and add new flag column in pyspark?

我通过执行以下命令创建了两个数据框。

test1 = sc.parallelize([
    ("a",1,1),
    ("b",2,2),
    ("d",4,2),
    ("e",4,1),
    ("c",3,4)]).toDF(['SID','SSection','SRank'])
test1.show()
+---+--------+-----+
|SID|SSection|SRank|
+---+--------+-----+
|  a|       1|    1|
|  b|       2|    2|
|  d|       4|    2|
|  e|       4|    1|
|  c|       3|    4|
+---+--------+-----+

test2=sc.parallelize([
    ("a",1,1),
    ("b",2,3),
    ("f",4,2),
    ("e",4,1),
    ("c",3,4)]).toDF(['SID','SSection','SRank'])
test2.show()
+---+--------+-----+
|SID|SSection|SRank|
+---+--------+-----+
|  a|       1|    1|
|  b|       2|    3|
|  f|       4|    2|
|  e|       4|    1|
|  c|       3|    4|
+---+--------+-----+

使用 test1 和 test2 数据帧我需要生成新的数据帧,它应该包含如下结果。

+---+--------+----------+------------+------------+
|SID|SSection|test1SRank|test2SRank  |      flag  |
+---+--------+----------+------------+------------+
|  a|       1|    1     |       1    | same_rank  |
|  b|       2|    2     |       3    |rank_changed|
|  d|       4|    2     |       0    |No_rank     |
|  e|       4|    1     |       1    |same_rank   |
|  c|       3|    4     |       4    |same_rank   |
|  f|       4|    0     |       2    |new_rank    |
+---+--------+----------+------------+------------+

我想通过使用 SIDSSection 列的组合比较 test1 和 test2 数据帧以及 ranks.

之间的比较来生成以上结果

例如:

1) SID (a) 和 SSection (1): 在 test1rank 中是 1 而 test2rank 是 1 所以我的标志值应该是 same_rank.

2) SID (b) 和 SSection (2):在 test1rank 中是 2 并且 test2rank 是 3 这里的 rank 被改变了所以我的标志值应该是 rank_changed.

3) SID (d) 和 SSection (4): 在 test1rank 中是 2 而在 test2rank 中他失去了他的排名,所以我的标志值应该是 No_rank 4) SID (f) 和 SSection (4):在 test1rank 中他表现不佳所以他没有任何排名,在 test2rank 中他表现良好他的排名是 2,所以我的标志值应该是 New_rank

这应该能满足您的需求:

from pyspark.sql import functions as f

test3=test1.withColumnRenamed('SRank','test1SRank')\
    .join(test2.drop('SSection')\
    .withColumnRenamed('SRank','test2SRank'), on='SID', how='outer')\
    .fillna(0)

test3=test3.withColumn('flag', f.expr("case when test1SRank=0 and test2SRank>0 then 'new_rank'\
                                            when test1SRank>0 and test2SRank=0 then 'No_rank'\
                                            when test1SRank=test2SRank then 'same_rank'\
                                            else 'rank_changed' end"))
test3.orderBy('SID').show()

解释:外部加入数据框,因此您拥有所有 SID 的 test1 和 test2 分数。然后用 0 填充空值并使用 sql case when 语句执行标志逻辑。