Pyspark:对于每个月,对前 3 个月进行累计
Pyspark: For each month, make a cumulative sum of the previous 3 months
我正在使用 PYSPARK,我正在尝试计算特定月份过去 3 个月的累计总和:
示例:
Month Value
Jan/19 1
Feb/19 0
Mar/19 4
Apr/19 5
May/19 0
Jun/19 10
因此前几个月每个月的累计总和为:
Month Value
Jan/19 1
Feb/19 1 + 0 = 1
Mar/19 1+0+4 = 5
Apr/19 0+4+5 = 9
May/19 4+5+0 = 9
Jun/19 5+0+10 = 15
我很确定我需要使用 window 和分区函数,但我不知道如何设置它。
谁能帮我解决这个问题?
谢谢
示例数据帧:
df.show()
+------+-----+
| Month|Value|
+------+-----+
|Jan/19| 1|
|Feb/19| 0|
|Mar/19| 4|
|Apr/19| 5|
|May/19| 0|
|Jun/19| 10|
+------+-----+
您可以使用 window
函数,但您需要将 month
列转换为适当的timestamp
格式,然后将其转换为 long
以计算 range(3months)
基于 unix time
或 timestamp in seconds
。您可以对真实数据中的分组列进行分区。 (86400 是 1 天秒)。
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().orderBy(F.col("Month").cast("long")).rangeBetween(-(86400*89), 0)
df\
.withColumn("Month", F.to_timestamp("Month","MMM/yy"))\
.withColumn("Sum", F.sum("Value").over(w)).show()
+-------------------+-----+---+
| Month|Value|Sum|
+-------------------+-----+---+
|2019-01-01 00:00:00| 1| 1|
|2019-02-01 00:00:00| 0| 1|
|2019-03-01 00:00:00| 4| 5|
|2019-04-01 00:00:00| 5| 9|
|2019-05-01 00:00:00| 0| 9|
|2019-06-01 00:00:00| 10| 15|
+-------------------+-----+---+
如果您想在 each year
中返回 3 months only
。意思是 Jan/19
将只有 Jan/19
值 。对于这种情况,您应该使用 partitionBy
of Year
和 orderBy month number
,以及rangeBetween -2 and 0.
w=Window().partitionBy(F.year("Month")).orderBy(F.month("Month")).rangeBetween(-2, 0)
df\
.withColumn("Month", F.to_timestamp("Month","MMM/yy"))\
.withColumn("Sum", F.sum("Value").over(w)).show()
+-------------------+-----+---+
| Month|Value|Sum|
+-------------------+-----+---+
|2019-01-01 00:00:00| 1| 1|
|2019-02-01 00:00:00| 0| 1|
|2019-03-01 00:00:00| 4| 5|
|2019-04-01 00:00:00| 5| 9|
|2019-05-01 00:00:00| 0| 9|
|2019-06-01 00:00:00| 10| 15|
+-------------------+-----+---+
我正在使用 PYSPARK,我正在尝试计算特定月份过去 3 个月的累计总和:
示例:
Month Value
Jan/19 1
Feb/19 0
Mar/19 4
Apr/19 5
May/19 0
Jun/19 10
因此前几个月每个月的累计总和为:
Month Value
Jan/19 1
Feb/19 1 + 0 = 1
Mar/19 1+0+4 = 5
Apr/19 0+4+5 = 9
May/19 4+5+0 = 9
Jun/19 5+0+10 = 15
我很确定我需要使用 window 和分区函数,但我不知道如何设置它。
谁能帮我解决这个问题?
谢谢
示例数据帧:
df.show()
+------+-----+
| Month|Value|
+------+-----+
|Jan/19| 1|
|Feb/19| 0|
|Mar/19| 4|
|Apr/19| 5|
|May/19| 0|
|Jun/19| 10|
+------+-----+
您可以使用 window
函数,但您需要将 month
列转换为适当的timestamp
格式,然后将其转换为 long
以计算 range(3months)
基于 unix time
或 timestamp in seconds
。您可以对真实数据中的分组列进行分区。 (86400 是 1 天秒)。
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().orderBy(F.col("Month").cast("long")).rangeBetween(-(86400*89), 0)
df\
.withColumn("Month", F.to_timestamp("Month","MMM/yy"))\
.withColumn("Sum", F.sum("Value").over(w)).show()
+-------------------+-----+---+
| Month|Value|Sum|
+-------------------+-----+---+
|2019-01-01 00:00:00| 1| 1|
|2019-02-01 00:00:00| 0| 1|
|2019-03-01 00:00:00| 4| 5|
|2019-04-01 00:00:00| 5| 9|
|2019-05-01 00:00:00| 0| 9|
|2019-06-01 00:00:00| 10| 15|
+-------------------+-----+---+
如果您想在 each year
中返回 3 months only
。意思是 Jan/19
将只有 Jan/19
值 。对于这种情况,您应该使用 partitionBy
of Year
和 orderBy month number
,以及rangeBetween -2 and 0.
w=Window().partitionBy(F.year("Month")).orderBy(F.month("Month")).rangeBetween(-2, 0)
df\
.withColumn("Month", F.to_timestamp("Month","MMM/yy"))\
.withColumn("Sum", F.sum("Value").over(w)).show()
+-------------------+-----+---+
| Month|Value|Sum|
+-------------------+-----+---+
|2019-01-01 00:00:00| 1| 1|
|2019-02-01 00:00:00| 0| 1|
|2019-03-01 00:00:00| 4| 5|
|2019-04-01 00:00:00| 5| 9|
|2019-05-01 00:00:00| 0| 9|
|2019-06-01 00:00:00| 10| 15|
+-------------------+-----+---+