如何比较两个数据框并在 pyspark 中添加新的标志列?
How to compare two dataframes and add new flag column in pyspark?
我通过执行以下命令创建了两个数据框。
test1 = sc.parallelize([
("a",1,1),
("b",2,2),
("d",4,2),
("e",4,1),
("c",3,4)]).toDF(['SID','SSection','SRank'])
test1.show()
+---+--------+-----+
|SID|SSection|SRank|
+---+--------+-----+
| a| 1| 1|
| b| 2| 2|
| d| 4| 2|
| e| 4| 1|
| c| 3| 4|
+---+--------+-----+
test2=sc.parallelize([
("a",1,1),
("b",2,3),
("f",4,2),
("e",4,1),
("c",3,4)]).toDF(['SID','SSection','SRank'])
test2.show()
+---+--------+-----+
|SID|SSection|SRank|
+---+--------+-----+
| a| 1| 1|
| b| 2| 3|
| f| 4| 2|
| e| 4| 1|
| c| 3| 4|
+---+--------+-----+
使用 test1 和 test2 数据帧我需要生成新的数据帧,它应该包含如下结果。
+---+--------+----------+------------+------------+
|SID|SSection|test1SRank|test2SRank | flag |
+---+--------+----------+------------+------------+
| a| 1| 1 | 1 | same_rank |
| b| 2| 2 | 3 |rank_changed|
| d| 4| 2 | 0 |No_rank |
| e| 4| 1 | 1 |same_rank |
| c| 3| 4 | 4 |same_rank |
| f| 4| 0 | 2 |new_rank |
+---+--------+----------+------------+------------+
我想通过使用 SID
和 SSection
列的组合比较 test1 和 test2 数据帧以及 ranks
.
之间的比较来生成以上结果
例如:
1) SID (a)
和 SSection (1)
: 在 test1rank 中是 1
而 test2rank 是 1
所以我的标志值应该是 same_rank
.
2) SID (b)
和 SSection (2)
:在 test1rank 中是 2
并且 test2rank 是 3
这里的 rank 被改变了所以我的标志值应该是 rank_changed
.
3) SID (d)
和 SSection (4)
: 在 test1rank 中是 2
而在 test2rank 中他失去了他的排名,所以我的标志值应该是 No_rank
4) SID (f)
和 SSection (4)
:在 test1rank 中他表现不佳所以他没有任何排名,在 test2rank 中他表现良好他的排名是 2,所以我的标志值应该是 New_rank
这应该能满足您的需求:
from pyspark.sql import functions as f
test3=test1.withColumnRenamed('SRank','test1SRank')\
.join(test2.drop('SSection')\
.withColumnRenamed('SRank','test2SRank'), on='SID', how='outer')\
.fillna(0)
test3=test3.withColumn('flag', f.expr("case when test1SRank=0 and test2SRank>0 then 'new_rank'\
when test1SRank>0 and test2SRank=0 then 'No_rank'\
when test1SRank=test2SRank then 'same_rank'\
else 'rank_changed' end"))
test3.orderBy('SID').show()
解释:外部加入数据框,因此您拥有所有 SID 的 test1 和 test2 分数。然后用 0 填充空值并使用 sql case when 语句执行标志逻辑。
我通过执行以下命令创建了两个数据框。
test1 = sc.parallelize([
("a",1,1),
("b",2,2),
("d",4,2),
("e",4,1),
("c",3,4)]).toDF(['SID','SSection','SRank'])
test1.show()
+---+--------+-----+
|SID|SSection|SRank|
+---+--------+-----+
| a| 1| 1|
| b| 2| 2|
| d| 4| 2|
| e| 4| 1|
| c| 3| 4|
+---+--------+-----+
test2=sc.parallelize([
("a",1,1),
("b",2,3),
("f",4,2),
("e",4,1),
("c",3,4)]).toDF(['SID','SSection','SRank'])
test2.show()
+---+--------+-----+
|SID|SSection|SRank|
+---+--------+-----+
| a| 1| 1|
| b| 2| 3|
| f| 4| 2|
| e| 4| 1|
| c| 3| 4|
+---+--------+-----+
使用 test1 和 test2 数据帧我需要生成新的数据帧,它应该包含如下结果。
+---+--------+----------+------------+------------+
|SID|SSection|test1SRank|test2SRank | flag |
+---+--------+----------+------------+------------+
| a| 1| 1 | 1 | same_rank |
| b| 2| 2 | 3 |rank_changed|
| d| 4| 2 | 0 |No_rank |
| e| 4| 1 | 1 |same_rank |
| c| 3| 4 | 4 |same_rank |
| f| 4| 0 | 2 |new_rank |
+---+--------+----------+------------+------------+
我想通过使用 SID
和 SSection
列的组合比较 test1 和 test2 数据帧以及 ranks
.
例如:
1) SID (a)
和 SSection (1)
: 在 test1rank 中是 1
而 test2rank 是 1
所以我的标志值应该是 same_rank
.
2) SID (b)
和 SSection (2)
:在 test1rank 中是 2
并且 test2rank 是 3
这里的 rank 被改变了所以我的标志值应该是 rank_changed
.
3) SID (d)
和 SSection (4)
: 在 test1rank 中是 2
而在 test2rank 中他失去了他的排名,所以我的标志值应该是 No_rank
4) SID (f)
和 SSection (4)
:在 test1rank 中他表现不佳所以他没有任何排名,在 test2rank 中他表现良好他的排名是 2,所以我的标志值应该是 New_rank
这应该能满足您的需求:
from pyspark.sql import functions as f
test3=test1.withColumnRenamed('SRank','test1SRank')\
.join(test2.drop('SSection')\
.withColumnRenamed('SRank','test2SRank'), on='SID', how='outer')\
.fillna(0)
test3=test3.withColumn('flag', f.expr("case when test1SRank=0 and test2SRank>0 then 'new_rank'\
when test1SRank>0 and test2SRank=0 then 'No_rank'\
when test1SRank=test2SRank then 'same_rank'\
else 'rank_changed' end"))
test3.orderBy('SID').show()
解释:外部加入数据框,因此您拥有所有 SID 的 test1 和 test2 分数。然后用 0 填充空值并使用 sql case when 语句执行标志逻辑。