如何在pyspark中使用explode?

how to use explode in pyspark?

我想在 pyspark 中完成以下任务。让我有以下数据框:

df:

all_skills           dates  
['A','B']           2020-11-01
['B','I','R']       2020-11-01
['S','H']           2020-11-02
['A','H','S']       2020-11-02

在 pandas 中,以下代码为我提供了所需的输出:

`data_ready=data.explode('all_skills').groupby('all_skills'['dates'].value_counts().unstack('dates', fill_value=0)

output=data_ready.groupby(['all_skills']).sum()`






 all_skills 2020-11-01     2020-11-02
    'A'        1                  1  
    'B'        2                  0
    'S'        0                  2
    'H'        0                  2
    'R'        1                  0

基本上我是在计算 all_skills 列表元素的频率,如果它们有同一天

您可以 explode the all_skills array and then group by and pivot and apply count aggregation. Finally, apply coalesce 到 poly-fill 空值到 0

from pyspark.sql import functions as F

data = [(['A', 'B'], "2020-11-01",),
 (['B', 'I', 'R'], "2020-11-01",),
 (['S', 'H'], "2020-11-02",),
 (['A', 'H', 'S'], "2020-11-02",), ]


df = spark.createDataFrame(data, ("all_skills", "dates",))

pivoted_df = (df.withColumn("all_skills", F.explode("all_skills"))
   .groupBy("all_skills")
   .pivot("dates")
   .agg(F.count("all_skills"))
)
final_df = pivoted_df.select([F.col("all_skills") if col_name == "all_skills" else F.coalesce(F.col(col_name), F.lit(0)).alias(col_name)  for col_name in pivoted_df.columns])

final_df.show()

"""
+----------+----------+----------+
|all_skills|2020-11-01|2020-11-02|
+----------+----------+----------+
|         B|         2|         0|
|         A|         1|         1|
|         S|         0|         2|
|         R|         1|         0|
|         I|         1|         0|
|         H|         0|         2|
+----------+----------+----------+
"""

这可能就是您要找的。

df1
.withColumn('skill', F.explode('all_skills')).drop('all_skills')
.groupBy("dates", "skill")
.agg(F.count("*")
.alias("count"))
.groupBy("skill")
.pivot("dates")
.agg(F.sum("count"))
.na.fill(0).show()