Spark: agg 函数和 spark 数据帧上的 window 函数有区别吗?
Spark : Is there differences between agg function and a window function on a spark dataframe?
我想对 spark Dataframe (Spark 2.1) 中的列应用总和,我有两种方法可以做到这一点:
1- 使用 Window 函数:
val windowing = Window.partitionBy("id")
dataframe
.withColumn("sum", sum(col("column_1")) over windowing)
2- 使用聚合函数:
dataframe
.groupBy("id")
.agg(sum(col("column_1")).alias("sum"))
就性能而言,最好的方法是什么?这两种方法有什么区别?
您可以在 window(您的第一个案例)中或分组时(您的第二个案例)使用聚合函数。不同之处在于,对于 window,每个 行 将与在其整个 window 上计算的聚合结果相关联。然而,当分组时,每个 group 将与该组的聚合结果相关联(一组行变成只有一行)。
在你的情况下,你会得到这个。
val dataframe = spark.range(6).withColumn("key", 'id % 2)
dataframe.show
+---+---+
| id|key|
+---+---+
| 0| 0|
| 1| 1|
| 2| 0|
| 3| 1|
| 4| 0|
| 5| 1|
+---+---+
案例 1:windowing
val windowing = Window.partitionBy("key")
dataframe.withColumn("sum", sum(col("id")) over windowing).show
+---+---+---+
| id|key|sum|
+---+---+---+
| 0| 0| 6|
| 2| 0| 6|
| 4| 0| 6|
| 1| 1| 9|
| 3| 1| 9|
| 5| 1| 9|
+---+---+---+
案例二:分组
dataframe.groupBy("key").agg(sum('id)).show
+---+-------+
|key|sum(id)|
+---+-------+
| 0| 6|
| 1| 9|
+---+-------+
正如@Oli 提到的,聚合函数可以在 window(第一种情况)以及分组(第二种情况)中使用。在性能方面,'aggregation function with grouping' 与 'aggregation function with Window' 相比要快得多。我们可以通过分析物理计划来形象化这一点。
df.groupBy("id").agg(sum($"expense").alias("total_expense")).explain()
df.show
+---+----------+
| id| expense|
+---+----------+
| 1| 100|
| 2| 300|
| 1| 100|
| 3| 200|
+---+----------+
1- 与 Window 的聚合:
df.withColumn("total_expense", sum(col("expense")) over window).show
+---+----------+-------------------+
| id| expense| total_expense|
+---+----------+-------------------+
| 3| 200| 200|
| 1| 100| 200|
| 1| 100| 200|
| 2| 300| 300|
+---+----------+-------------------+
df.withColumn("total_expense", sum(col("expense")) over window).explain
== Physical Plan ==
Window [sum(cast(expense#9 as bigint)) windowspecdefinition(id#8, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS total_expense#265L], [id#8]
+- *(2) Sort [id#8 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#8, 200), true, [id=#144]
+- *(1) Project [_1#3 AS id#8, _2#4 AS expense#9]
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#4]
+- Scan[obj#2]
2- 使用 GroupBy 聚合:
df.groupBy("id").agg(sum($"expense").alias("total_expense")).show
+---+------------------+
| id| total_expense|
+---+------------------+
| 3| 200|
| 1| 200|
| 2| 300|
+---+------------------+
df.groupBy("id").agg(sum($"expense").alias("total_expense")).explain()
== Physical Plan ==
*(2) HashAggregate(keys=[id#8], functions=[sum(cast(expense#9 as bigint))])
+- Exchange hashpartitioning(id#8, 200), true, [id=#44]
+- *(1) HashAggregate(keys=[id#8], functions=[partial_sum(cast(expense#9 as bigint))])
+- *(1) Project [_1#3 AS id#8, _2#4 AS expense#9]
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#4]
+- Scan[obj#2]
根据执行计划,我们可以看到在 windows 的情况下有一次总洗牌和一种排序,而在 groupby 的情况下有一个 reduced 洗牌(洗牌在本地聚合后 partial_sum).
我想对 spark Dataframe (Spark 2.1) 中的列应用总和,我有两种方法可以做到这一点:
1- 使用 Window 函数:
val windowing = Window.partitionBy("id")
dataframe
.withColumn("sum", sum(col("column_1")) over windowing)
2- 使用聚合函数:
dataframe
.groupBy("id")
.agg(sum(col("column_1")).alias("sum"))
就性能而言,最好的方法是什么?这两种方法有什么区别?
您可以在 window(您的第一个案例)中或分组时(您的第二个案例)使用聚合函数。不同之处在于,对于 window,每个 行 将与在其整个 window 上计算的聚合结果相关联。然而,当分组时,每个 group 将与该组的聚合结果相关联(一组行变成只有一行)。
在你的情况下,你会得到这个。
val dataframe = spark.range(6).withColumn("key", 'id % 2)
dataframe.show
+---+---+
| id|key|
+---+---+
| 0| 0|
| 1| 1|
| 2| 0|
| 3| 1|
| 4| 0|
| 5| 1|
+---+---+
案例 1:windowing
val windowing = Window.partitionBy("key")
dataframe.withColumn("sum", sum(col("id")) over windowing).show
+---+---+---+
| id|key|sum|
+---+---+---+
| 0| 0| 6|
| 2| 0| 6|
| 4| 0| 6|
| 1| 1| 9|
| 3| 1| 9|
| 5| 1| 9|
+---+---+---+
案例二:分组
dataframe.groupBy("key").agg(sum('id)).show
+---+-------+
|key|sum(id)|
+---+-------+
| 0| 6|
| 1| 9|
+---+-------+
正如@Oli 提到的,聚合函数可以在 window(第一种情况)以及分组(第二种情况)中使用。在性能方面,'aggregation function with grouping' 与 'aggregation function with Window' 相比要快得多。我们可以通过分析物理计划来形象化这一点。
df.groupBy("id").agg(sum($"expense").alias("total_expense")).explain()
df.show
+---+----------+
| id| expense|
+---+----------+
| 1| 100|
| 2| 300|
| 1| 100|
| 3| 200|
+---+----------+
1- 与 Window 的聚合:
df.withColumn("total_expense", sum(col("expense")) over window).show
+---+----------+-------------------+
| id| expense| total_expense|
+---+----------+-------------------+
| 3| 200| 200|
| 1| 100| 200|
| 1| 100| 200|
| 2| 300| 300|
+---+----------+-------------------+
df.withColumn("total_expense", sum(col("expense")) over window).explain
== Physical Plan ==
Window [sum(cast(expense#9 as bigint)) windowspecdefinition(id#8, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS total_expense#265L], [id#8]
+- *(2) Sort [id#8 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#8, 200), true, [id=#144]
+- *(1) Project [_1#3 AS id#8, _2#4 AS expense#9]
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#4]
+- Scan[obj#2]
2- 使用 GroupBy 聚合:
df.groupBy("id").agg(sum($"expense").alias("total_expense")).show
+---+------------------+
| id| total_expense|
+---+------------------+
| 3| 200|
| 1| 200|
| 2| 300|
+---+------------------+
df.groupBy("id").agg(sum($"expense").alias("total_expense")).explain()
== Physical Plan ==
*(2) HashAggregate(keys=[id#8], functions=[sum(cast(expense#9 as bigint))])
+- Exchange hashpartitioning(id#8, 200), true, [id=#44]
+- *(1) HashAggregate(keys=[id#8], functions=[partial_sum(cast(expense#9 as bigint))])
+- *(1) Project [_1#3 AS id#8, _2#4 AS expense#9]
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#4]
+- Scan[obj#2]
根据执行计划,我们可以看到在 windows 的情况下有一次总洗牌和一种排序,而在 groupby 的情况下有一个 reduced 洗牌(洗牌在本地聚合后 partial_sum).