在 Pyspark 中对多列进行累加和的有效方法
efficient way to do cumulate sum on multiple columns in Pyspark
我有一个 table 看起来像:
+----+------+-----+-------+
|time|val1 |val2 | class|
+----+------+-----+-------+
| 1| 3 | 2| b|
| 2| 3 | 1| b|
| 1| 2 | 4| a|
| 2| 2 | 5| a|
| 3| 1 | 5| a|
+----+------+-----+-------+
现在我想对 val1 和 val2 列进行累加。所以我创建了一个 window 函数:
windowval = (Window.partitionBy('class').orderBy('time')
.rangeBetween(Window.unboundedPreceding, 0))
new_df = my_df.withColumn('cum_sum1', F.sum("val1").over(windowval))
.withColumn('cum_sum2', F.sum("val2").over(windowval))
但我认为Spark会在原来的table上应用两次window函数,这样效率似乎较低。由于问题非常简单,有没有办法简单地应用一次 window 函数,然后对两列一起进行累加?
But I think Spark will apply window function twice on the original table, which seems less efficient.
你的假设不正确。看一下优化后的logical
就够了
== Optimized Logical Plan ==
Window [sum(val1#1L) windowspecdefinition(class#3, time#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cum_sum1#9L, sum(val2#2L) windowspecdefinition(class#3, time#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cum_sum2#16L], [class#3], [time#0L ASC NULLS FIRST]
+- LogicalRDD [time#0L, val1#1L, val2#2L, class#3], false
或实体计划
== Physical Plan ==
Window [sum(val1#1L) windowspecdefinition(class#3, time#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cum_sum1#9L, sum(val2#2L) windowspecdefinition(class#3, time#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cum_sum2#16L], [class#3], [time#0L ASC NULLS FIRST]
+- *(1) Sort [class#3 ASC NULLS FIRST, time#0L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(class#3, 200)
+- Scan ExistingRDD[time#0L,val1#1L,val2#2L,class#3]
两者都明确表示Window
仅应用一次。
我有一个 table 看起来像:
+----+------+-----+-------+
|time|val1 |val2 | class|
+----+------+-----+-------+
| 1| 3 | 2| b|
| 2| 3 | 1| b|
| 1| 2 | 4| a|
| 2| 2 | 5| a|
| 3| 1 | 5| a|
+----+------+-----+-------+
现在我想对 val1 和 val2 列进行累加。所以我创建了一个 window 函数:
windowval = (Window.partitionBy('class').orderBy('time')
.rangeBetween(Window.unboundedPreceding, 0))
new_df = my_df.withColumn('cum_sum1', F.sum("val1").over(windowval))
.withColumn('cum_sum2', F.sum("val2").over(windowval))
但我认为Spark会在原来的table上应用两次window函数,这样效率似乎较低。由于问题非常简单,有没有办法简单地应用一次 window 函数,然后对两列一起进行累加?
But I think Spark will apply window function twice on the original table, which seems less efficient.
你的假设不正确。看一下优化后的logical
就够了== Optimized Logical Plan ==
Window [sum(val1#1L) windowspecdefinition(class#3, time#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cum_sum1#9L, sum(val2#2L) windowspecdefinition(class#3, time#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cum_sum2#16L], [class#3], [time#0L ASC NULLS FIRST]
+- LogicalRDD [time#0L, val1#1L, val2#2L, class#3], false
或实体计划
== Physical Plan ==
Window [sum(val1#1L) windowspecdefinition(class#3, time#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cum_sum1#9L, sum(val2#2L) windowspecdefinition(class#3, time#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cum_sum2#16L], [class#3], [time#0L ASC NULLS FIRST]
+- *(1) Sort [class#3 ASC NULLS FIRST, time#0L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(class#3, 200)
+- Scan ExistingRDD[time#0L,val1#1L,val2#2L,class#3]
两者都明确表示Window
仅应用一次。