如何在 PySpark DataFrame 列中查找连续值并替换值
How to Find the consecutive values in PySpark DataFrame column and replace the value
我有以下数据框
ID Name Dept
1 John ABC
2 Rio BCD
3 Marry BCD
4 Andy BCD
5 Smith PQR
6 Rich XYZ
7 Lisa LMN
8 Steve LMN
9 Ali STU
我们可以看到在 Dept 列中 BCD 重复了 3 次,LMN 重复了 2 次。
现在我需要创建新列 Dept_Updated 并检查连续值,如果有连续值只需在最后添加下划线并在下划线后添加数字,如果不是连续值则保留为是的。
我需要以下格式的输出。
ID Name Dept Dept_Updated
1 John ABC ABC
2 Rio BCD BCD_1
3 Marry BCD BCD_2
4 Andy BCD BCD_3
5 Smith PQR PQR
6 Rich XYZ XYZ
7 Lisa LMN LMN_1
8 Steve LMN LMN_2
9 Ali STU STU
我是 PySpark 的新手,有什么方法可以实现上述输出,这真的很有帮助。
我们需要按 Dept 分区的 window,并且还需要仅检查连续条目。为此,我提出类似下面的建议,它将检查下一行是否与当前行相同,并且只为具有重复项的条目附加 Rnk 列(计数列):
from pyspark.sql import functions as F, Window as W
w = W.orderBy('ID')
w1 = W.partitionBy("Dept").orderBy("Dept")
condition = F.col("Check_Duplicate")| ((F.col("CheckLength")>1) & (F.col("Rnk")==1))
new_df = df.withColumn("Check_Duplicate",F.col("Dept")==F.lag("Dept").over(w))\
.withColumn("Rnk",F.row_number().over(w1))\
.withColumn("CheckLength",F.count("Dept").over(w1))\
.withColumn("Dept_Updated",F.when(condition,F.concat_ws("_",*["Dept","Rnk"]))
.otherwise(F.col("Dept")))
new_df.select(*df.columns,'Dept_Updated').orderBy("ID").show()
输出:
+---+-----+----+------------+
| ID| Name|Dept|Dept_Updated|
+---+-----+----+------------+
| 1| John| ABC| ABC|
| 2| Rio| BCD| BCD_1|
| 3|Marry| BCD| BCD_2|
| 4| Andy| BCD| BCD_3|
| 5|Smith| PQR| PQR|
| 6| Rich| XYZ| XYZ|
| 7| Lisa| LMN| LMN_1|
| 8|Steve| LMN| LMN_2|
| 9| Ali| STU| STU|
+---+-----+----+------------+
测试表明如果 Dept 没有连续重复,代码不会附加行号:
+---+-----+----+------------+
| ID| Name|Dept|Dept_Updated|
+---+-----+----+------------+
| 1| John| ABC| ABC|
| 2| Rio| BCD| BCD_1|
| 3|Marry| BCD| BCD_2|
| 4| Andy| BCD| BCD_3|
| 5|Smith| PQR| PQR|
| 6| Rich| BCD| BCD| # <-- This entry is repeated but not consecutive
| 7| Lisa| LMN| LMN_1|
| 8|Steve| LMN| LMN_2|
| 9| Ali| STU| STU|
我有以下数据框
ID Name Dept
1 John ABC
2 Rio BCD
3 Marry BCD
4 Andy BCD
5 Smith PQR
6 Rich XYZ
7 Lisa LMN
8 Steve LMN
9 Ali STU
我们可以看到在 Dept 列中 BCD 重复了 3 次,LMN 重复了 2 次。
现在我需要创建新列 Dept_Updated 并检查连续值,如果有连续值只需在最后添加下划线并在下划线后添加数字,如果不是连续值则保留为是的。
我需要以下格式的输出。
ID Name Dept Dept_Updated
1 John ABC ABC
2 Rio BCD BCD_1
3 Marry BCD BCD_2
4 Andy BCD BCD_3
5 Smith PQR PQR
6 Rich XYZ XYZ
7 Lisa LMN LMN_1
8 Steve LMN LMN_2
9 Ali STU STU
我是 PySpark 的新手,有什么方法可以实现上述输出,这真的很有帮助。
我们需要按 Dept 分区的 window,并且还需要仅检查连续条目。为此,我提出类似下面的建议,它将检查下一行是否与当前行相同,并且只为具有重复项的条目附加 Rnk 列(计数列):
from pyspark.sql import functions as F, Window as W
w = W.orderBy('ID')
w1 = W.partitionBy("Dept").orderBy("Dept")
condition = F.col("Check_Duplicate")| ((F.col("CheckLength")>1) & (F.col("Rnk")==1))
new_df = df.withColumn("Check_Duplicate",F.col("Dept")==F.lag("Dept").over(w))\
.withColumn("Rnk",F.row_number().over(w1))\
.withColumn("CheckLength",F.count("Dept").over(w1))\
.withColumn("Dept_Updated",F.when(condition,F.concat_ws("_",*["Dept","Rnk"]))
.otherwise(F.col("Dept")))
new_df.select(*df.columns,'Dept_Updated').orderBy("ID").show()
输出:
+---+-----+----+------------+
| ID| Name|Dept|Dept_Updated|
+---+-----+----+------------+
| 1| John| ABC| ABC|
| 2| Rio| BCD| BCD_1|
| 3|Marry| BCD| BCD_2|
| 4| Andy| BCD| BCD_3|
| 5|Smith| PQR| PQR|
| 6| Rich| XYZ| XYZ|
| 7| Lisa| LMN| LMN_1|
| 8|Steve| LMN| LMN_2|
| 9| Ali| STU| STU|
+---+-----+----+------------+
测试表明如果 Dept 没有连续重复,代码不会附加行号:
+---+-----+----+------------+
| ID| Name|Dept|Dept_Updated|
+---+-----+----+------------+
| 1| John| ABC| ABC|
| 2| Rio| BCD| BCD_1|
| 3|Marry| BCD| BCD_2|
| 4| Andy| BCD| BCD_3|
| 5|Smith| PQR| PQR|
| 6| Rich| BCD| BCD| # <-- This entry is repeated but not consecutive
| 7| Lisa| LMN| LMN_1|
| 8|Steve| LMN| LMN_2|
| 9| Ali| STU| STU|