Spark SQL 是否优化具有重复表达式的查询?
Does Spark SQL optimize queries with repeated expressions?
鉴于以下情况
from pyspark.sql import functions, window
f = functions.rank()
w1 = window.Window.partitionBy("column")
w2 = window.Window.partitionBy("column")
col = functions.col("column * 42")
和数据帧 df
,
的性能有什么不同吗
df.select(f.over(w1), f.over(w2))
对
df.select(f.over(w1), f.over(w1))
?
那
呢
df.select(col + 1, col + 2)
对
df.select(functions.expr("column * 42 + 1"), functions.expr("column * 42 + 2")
?
(随意想象任意复杂的表达式来代替 column * 42
)
即重用 Column- 和 Window- 实例与即时构建这些表达式有什么好处吗?
我希望 Spark SQL 能够对此进行适当优化,但找不到关于此的决定性答案。
此外,我是否可以通过检查 df.explain()
的结果自己回答这个问题?如果可以,我应该寻找什么?
Feel free to imagine arbitrarily complex expressions in place of column * 42
...甚至任何 非确定性 表达式,例如生成随机数或当前时间戳。
每当你问这样的问题时,使用 explain
运算符来查看 Spark SQL 在幕后处理的内容(实际上应该与正在使用的编程语言和函数或方法无关,不应该吗?)
那么,在以下非确定性查询(或完全确定性,但乍一看是非确定性)的掩护下发生了什么:
val q = spark.range(1)
.select(
current_timestamp as "now", // <-- this should be the same as the following line?
current_timestamp as "now_2",
rand as "r1", // <-- what about this and the following lines?
rand as "r2",
rand as "r3")
scala> q.show(truncate = false)
+-----------------------+-----------------------+-------------------+------------------+------------------+
|now |now_2 |r1 |r2 |r3 |
+-----------------------+-----------------------+-------------------+------------------+------------------+
|2017-12-13 15:17:46.305|2017-12-13 15:17:46.305|0.33579358107333823|0.9478025260069644|0.5846726225651472|
+-----------------------+-----------------------+-------------------+------------------+------------------+
我实际上有点惊讶地注意到 rand
都产生了不同的结果,因为我假设结果是相同的。答案在...rand 的源代码中,如果没有明确定义,您可以看到它使用不同的种子(今天学习!谢谢)。
def rand(): Column = rand(Utils.random.nextLong)
答案是使用带有显式 seed
的 rand
版本,因为这将在整个查询中为您提供具有相同 seed
的相同 Rand
逻辑运算符.
val seed = 1
val q = spark.range(1)
.select(
current_timestamp as "now", // <-- this should be the same as the following line?
current_timestamp as "now_2",
rand(seed) as "r1", // <-- what about this and the following lines?
rand(seed) as "r2",
rand(seed) as "r3")
scala> q.show(false)
+-----------------------+-----------------------+-------------------+-------------------+-------------------+
|now |now_2 |r1 |r2 |r3 |
+-----------------------+-----------------------+-------------------+-------------------+-------------------+
|2017-12-13 15:43:59.019|2017-12-13 15:43:59.019|0.06498948189958098|0.06498948189958098|0.06498948189958098|
+-----------------------+-----------------------+-------------------+-------------------+-------------------+
Spark SQL 知道您在结构化查询中使用了什么,因为 Spark SQL 的高级 API 称为 DataFrame
或 Dataset
只是一个包装跨语言相同的逻辑运算符(Python、Scala、Java、R、SQL)。
只需查看任何函数的源代码,您就会看到一个 Catalyst 表达式(例如 rand) or a Dataset operator (e.g. select),您会看到一个或一棵逻辑运算符树。
最后,Spark SQL 使用基于规则的优化器,该优化器使用规则来优化您的查询并查找重复项。
那么,让我们看看你的案例(比 rand
更具确定性)。
(我使用的是 Scala,但差异在于语言而非优化级别)
import org.apache.spark.sql.expressions.Window
val w1 = Window.partitionBy("column").orderBy("column")
val w2 = Window.partitionBy("column").orderBy("column")
在您的情况下,您使用了需要订购数据集的 rank
,因此我添加了 orderBy
子句以使 window 规范完整。
scala> w1 == w2
res1: Boolean = false
它们确实与Scala的观点不同
val df = spark.range(5).withColumnRenamed("id", "column")
scala> df.show
+------+
|column|
+------+
| 0|
| 1|
| 2|
| 3|
| 4|
+------+
有了数据集(这与我们的讨论几乎无关),让我们创建一个结构化查询并explain
它来查看 Spark SQL 执行的物理计划。
val q = df.select(rank over w1, rank over w2)
scala> q.explain
== Physical Plan ==
*Project [RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#193, RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#194]
+- Window [rank(column#156L) windowspecdefinition(column#156L, column#156L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#193, rank(column#156L) windowspecdefinition(column#156L, column#156L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#194], [column#156L], [column#156L ASC NULLS FIRST]
+- *Sort [column#156L ASC NULLS FIRST, column#156L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(column#156L, 200)
+- *Project [id#153L AS column#156L]
+- *Range (0, 5, step=1, splits=8)
让我们使用带编号的输出,以便我们可以引用描述中的每一行。
val plan = q.queryExecution.executedPlan
scala> println(plan.numberedTreeString)
00 *Project [RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#193, RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#194]
01 +- Window [rank(column#156L) windowspecdefinition(column#156L, column#156L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#193, rank(column#156L) windowspecdefinition(column#156L, column#156L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#194], [column#156L], [column#156L ASC NULLS FIRST]
02 +- *Sort [column#156L ASC NULLS FIRST, column#156L ASC NULLS FIRST], false, 0
03 +- Exchange hashpartitioning(column#156L, 200)
04 +- *Project [id#153L AS column#156L]
05 +- *Range (0, 5, step=1, splits=8)
这样您就可以看到查询是否与另一个查询相似,如果有的话有什么不同。这是您可以获得的最明确的答案,而且……令人惊讶……Spark 版本之间可能(并且经常)发生变化。
I.e. is there any benefit in reusing Column- and Window-instances vs constructing these expressions on the fly?
我不会考虑太多,因为我希望 Spark 能够在内部处理它(正如您可能已经注意到的那样,我很惊讶 rand
的工作方式不同)。
只需使用explain
查看物理计划,您就可以自己回答问题。
鉴于以下情况
from pyspark.sql import functions, window
f = functions.rank()
w1 = window.Window.partitionBy("column")
w2 = window.Window.partitionBy("column")
col = functions.col("column * 42")
和数据帧 df
,
df.select(f.over(w1), f.over(w2))
对
df.select(f.over(w1), f.over(w1))
?
那
呢df.select(col + 1, col + 2)
对
df.select(functions.expr("column * 42 + 1"), functions.expr("column * 42 + 2")
?
(随意想象任意复杂的表达式来代替 column * 42
)
即重用 Column- 和 Window- 实例与即时构建这些表达式有什么好处吗?
我希望 Spark SQL 能够对此进行适当优化,但找不到关于此的决定性答案。
此外,我是否可以通过检查 df.explain()
的结果自己回答这个问题?如果可以,我应该寻找什么?
Feel free to imagine arbitrarily complex expressions in place of column * 42
...甚至任何 非确定性 表达式,例如生成随机数或当前时间戳。
每当你问这样的问题时,使用 explain
运算符来查看 Spark SQL 在幕后处理的内容(实际上应该与正在使用的编程语言和函数或方法无关,不应该吗?)
那么,在以下非确定性查询(或完全确定性,但乍一看是非确定性)的掩护下发生了什么:
val q = spark.range(1)
.select(
current_timestamp as "now", // <-- this should be the same as the following line?
current_timestamp as "now_2",
rand as "r1", // <-- what about this and the following lines?
rand as "r2",
rand as "r3")
scala> q.show(truncate = false)
+-----------------------+-----------------------+-------------------+------------------+------------------+
|now |now_2 |r1 |r2 |r3 |
+-----------------------+-----------------------+-------------------+------------------+------------------+
|2017-12-13 15:17:46.305|2017-12-13 15:17:46.305|0.33579358107333823|0.9478025260069644|0.5846726225651472|
+-----------------------+-----------------------+-------------------+------------------+------------------+
我实际上有点惊讶地注意到 rand
都产生了不同的结果,因为我假设结果是相同的。答案在...rand 的源代码中,如果没有明确定义,您可以看到它使用不同的种子(今天学习!谢谢)。
def rand(): Column = rand(Utils.random.nextLong)
答案是使用带有显式 seed
的 rand
版本,因为这将在整个查询中为您提供具有相同 seed
的相同 Rand
逻辑运算符.
val seed = 1
val q = spark.range(1)
.select(
current_timestamp as "now", // <-- this should be the same as the following line?
current_timestamp as "now_2",
rand(seed) as "r1", // <-- what about this and the following lines?
rand(seed) as "r2",
rand(seed) as "r3")
scala> q.show(false)
+-----------------------+-----------------------+-------------------+-------------------+-------------------+
|now |now_2 |r1 |r2 |r3 |
+-----------------------+-----------------------+-------------------+-------------------+-------------------+
|2017-12-13 15:43:59.019|2017-12-13 15:43:59.019|0.06498948189958098|0.06498948189958098|0.06498948189958098|
+-----------------------+-----------------------+-------------------+-------------------+-------------------+
Spark SQL 知道您在结构化查询中使用了什么,因为 Spark SQL 的高级 API 称为 DataFrame
或 Dataset
只是一个包装跨语言相同的逻辑运算符(Python、Scala、Java、R、SQL)。
只需查看任何函数的源代码,您就会看到一个 Catalyst 表达式(例如 rand) or a Dataset operator (e.g. select),您会看到一个或一棵逻辑运算符树。
最后,Spark SQL 使用基于规则的优化器,该优化器使用规则来优化您的查询并查找重复项。
那么,让我们看看你的案例(比 rand
更具确定性)。
(我使用的是 Scala,但差异在于语言而非优化级别)
import org.apache.spark.sql.expressions.Window
val w1 = Window.partitionBy("column").orderBy("column")
val w2 = Window.partitionBy("column").orderBy("column")
在您的情况下,您使用了需要订购数据集的 rank
,因此我添加了 orderBy
子句以使 window 规范完整。
scala> w1 == w2
res1: Boolean = false
它们确实与Scala的观点不同
val df = spark.range(5).withColumnRenamed("id", "column")
scala> df.show
+------+
|column|
+------+
| 0|
| 1|
| 2|
| 3|
| 4|
+------+
有了数据集(这与我们的讨论几乎无关),让我们创建一个结构化查询并explain
它来查看 Spark SQL 执行的物理计划。
val q = df.select(rank over w1, rank over w2)
scala> q.explain
== Physical Plan ==
*Project [RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#193, RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#194]
+- Window [rank(column#156L) windowspecdefinition(column#156L, column#156L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#193, rank(column#156L) windowspecdefinition(column#156L, column#156L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#194], [column#156L], [column#156L ASC NULLS FIRST]
+- *Sort [column#156L ASC NULLS FIRST, column#156L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(column#156L, 200)
+- *Project [id#153L AS column#156L]
+- *Range (0, 5, step=1, splits=8)
让我们使用带编号的输出,以便我们可以引用描述中的每一行。
val plan = q.queryExecution.executedPlan
scala> println(plan.numberedTreeString)
00 *Project [RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#193, RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#194]
01 +- Window [rank(column#156L) windowspecdefinition(column#156L, column#156L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#193, rank(column#156L) windowspecdefinition(column#156L, column#156L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#194], [column#156L], [column#156L ASC NULLS FIRST]
02 +- *Sort [column#156L ASC NULLS FIRST, column#156L ASC NULLS FIRST], false, 0
03 +- Exchange hashpartitioning(column#156L, 200)
04 +- *Project [id#153L AS column#156L]
05 +- *Range (0, 5, step=1, splits=8)
这样您就可以看到查询是否与另一个查询相似,如果有的话有什么不同。这是您可以获得的最明确的答案,而且……令人惊讶……Spark 版本之间可能(并且经常)发生变化。
I.e. is there any benefit in reusing Column- and Window-instances vs constructing these expressions on the fly?
我不会考虑太多,因为我希望 Spark 能够在内部处理它(正如您可能已经注意到的那样,我很惊讶 rand
的工作方式不同)。
只需使用explain
查看物理计划,您就可以自己回答问题。