如何在 PySpark collect_list 中维护排序顺序并收集多个列表
How to maintain sort order in PySpark collect_list and collect multiple lists
我想维护日期排序顺序,对多个列使用 collect_list,所有列都具有相同的日期顺序。我需要将它们放在同一个数据框中,以便我可以利用它们来创建时间序列模型输入。以下是 "train_data":
的示例
我将 Window 与 PartitionBy 一起使用,以确保每个 Syscode_Stn 的排序顺序为 tuning_evnt_start_dt。我可以用这段代码创建一列:
from pyspark.sql import functions as F
from pyspark.sql import Window
w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')
sorted_list_df = train_data
.withColumn('spp_imp_daily', F.collect_list('spp_imp_daily').over(w)
)\
.groupBy('Syscode_Stn')\
.agg(F.max('spp_imp_daily').alias('spp_imp_daily'))
但是如何在同一个新数据框中创建两列?
w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')
sorted_list_df = train_data
.withColumn('spp_imp_daily',F.collect_list('spp_imp_daily').over(w))
.withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))
.groupBy('Syscode_Stn')
.agg(F.max('spp_imp_daily').alias('spp_imp_daily')))
请注意,MarchMadInd 未显示在屏幕截图中,但包含在 train_data 中。解释我是如何到达现在的位置的:
是的,正确的方法是添加连续的 .withColumn 语句,然后是删除每个数组重复项的 .agg 语句。
w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')
sorted_list_df = train_data.withColumn('spp_imp_daily',
F.collect_list('spp_imp_daily').over(w)
)\
.withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))\
.groupBy('Syscode_Stn')\
.agg(F.max('spp_imp_daily').alias('spp_imp_daily'),
F.max('MarchMadInd').alias('MarchMadInd')
)
我想维护日期排序顺序,对多个列使用 collect_list,所有列都具有相同的日期顺序。我需要将它们放在同一个数据框中,以便我可以利用它们来创建时间序列模型输入。以下是 "train_data":
的示例我将 Window 与 PartitionBy 一起使用,以确保每个 Syscode_Stn 的排序顺序为 tuning_evnt_start_dt。我可以用这段代码创建一列:
from pyspark.sql import functions as F
from pyspark.sql import Window
w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')
sorted_list_df = train_data
.withColumn('spp_imp_daily', F.collect_list('spp_imp_daily').over(w)
)\
.groupBy('Syscode_Stn')\
.agg(F.max('spp_imp_daily').alias('spp_imp_daily'))
但是如何在同一个新数据框中创建两列?
w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')
sorted_list_df = train_data
.withColumn('spp_imp_daily',F.collect_list('spp_imp_daily').over(w))
.withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))
.groupBy('Syscode_Stn')
.agg(F.max('spp_imp_daily').alias('spp_imp_daily')))
请注意,MarchMadInd 未显示在屏幕截图中,但包含在 train_data 中。解释我是如何到达现在的位置的:
是的,正确的方法是添加连续的 .withColumn 语句,然后是删除每个数组重复项的 .agg 语句。
w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')
sorted_list_df = train_data.withColumn('spp_imp_daily',
F.collect_list('spp_imp_daily').over(w)
)\
.withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))\
.groupBy('Syscode_Stn')\
.agg(F.max('spp_imp_daily').alias('spp_imp_daily'),
F.max('MarchMadInd').alias('MarchMadInd')
)