如何在pyspark中使用explode?
how to use explode in pyspark?
我想在 pyspark
中完成以下任务。让我有以下数据框:
df:
all_skills dates
['A','B'] 2020-11-01
['B','I','R'] 2020-11-01
['S','H'] 2020-11-02
['A','H','S'] 2020-11-02
在 pandas 中,以下代码为我提供了所需的输出:
`data_ready=data.explode('all_skills').groupby('all_skills'['dates'].value_counts().unstack('dates', fill_value=0)
output=data_ready.groupby(['all_skills']).sum()`
all_skills 2020-11-01 2020-11-02
'A' 1 1
'B' 2 0
'S' 0 2
'H' 0 2
'R' 1 0
基本上我是在计算 all_skills
列表元素的频率,如果它们有同一天
您可以 explode
the all_skills
array and then group by and pivot
and apply count
aggregation. Finally, apply coalesce
到 poly-fill 空值到 0
。
from pyspark.sql import functions as F
data = [(['A', 'B'], "2020-11-01",),
(['B', 'I', 'R'], "2020-11-01",),
(['S', 'H'], "2020-11-02",),
(['A', 'H', 'S'], "2020-11-02",), ]
df = spark.createDataFrame(data, ("all_skills", "dates",))
pivoted_df = (df.withColumn("all_skills", F.explode("all_skills"))
.groupBy("all_skills")
.pivot("dates")
.agg(F.count("all_skills"))
)
final_df = pivoted_df.select([F.col("all_skills") if col_name == "all_skills" else F.coalesce(F.col(col_name), F.lit(0)).alias(col_name) for col_name in pivoted_df.columns])
final_df.show()
"""
+----------+----------+----------+
|all_skills|2020-11-01|2020-11-02|
+----------+----------+----------+
| B| 2| 0|
| A| 1| 1|
| S| 0| 2|
| R| 1| 0|
| I| 1| 0|
| H| 0| 2|
+----------+----------+----------+
"""
这可能就是您要找的。
df1
.withColumn('skill', F.explode('all_skills')).drop('all_skills')
.groupBy("dates", "skill")
.agg(F.count("*")
.alias("count"))
.groupBy("skill")
.pivot("dates")
.agg(F.sum("count"))
.na.fill(0).show()
我想在 pyspark
中完成以下任务。让我有以下数据框:
df:
all_skills dates
['A','B'] 2020-11-01
['B','I','R'] 2020-11-01
['S','H'] 2020-11-02
['A','H','S'] 2020-11-02
在 pandas 中,以下代码为我提供了所需的输出:
`data_ready=data.explode('all_skills').groupby('all_skills'['dates'].value_counts().unstack('dates', fill_value=0)
output=data_ready.groupby(['all_skills']).sum()`
all_skills 2020-11-01 2020-11-02
'A' 1 1
'B' 2 0
'S' 0 2
'H' 0 2
'R' 1 0
基本上我是在计算 all_skills
列表元素的频率,如果它们有同一天
您可以 explode
the all_skills
array and then group by and pivot
and apply count
aggregation. Finally, apply coalesce
到 poly-fill 空值到 0
。
from pyspark.sql import functions as F
data = [(['A', 'B'], "2020-11-01",),
(['B', 'I', 'R'], "2020-11-01",),
(['S', 'H'], "2020-11-02",),
(['A', 'H', 'S'], "2020-11-02",), ]
df = spark.createDataFrame(data, ("all_skills", "dates",))
pivoted_df = (df.withColumn("all_skills", F.explode("all_skills"))
.groupBy("all_skills")
.pivot("dates")
.agg(F.count("all_skills"))
)
final_df = pivoted_df.select([F.col("all_skills") if col_name == "all_skills" else F.coalesce(F.col(col_name), F.lit(0)).alias(col_name) for col_name in pivoted_df.columns])
final_df.show()
"""
+----------+----------+----------+
|all_skills|2020-11-01|2020-11-02|
+----------+----------+----------+
| B| 2| 0|
| A| 1| 1|
| S| 0| 2|
| R| 1| 0|
| I| 1| 0|
| H| 0| 2|
+----------+----------+----------+
"""
这可能就是您要找的。
df1
.withColumn('skill', F.explode('all_skills')).drop('all_skills')
.groupBy("dates", "skill")
.agg(F.count("*")
.alias("count"))
.groupBy("skill")
.pivot("dates")
.agg(F.sum("count"))
.na.fill(0).show()