Spark SQL where 子句中谓词的求值顺序
Order of evaluation of predicates in Spark SQL where clause
我试图了解 Spark SQL 中谓词求值的顺序,以提高查询的性能。
假设我有以下查询
"select * from tbl where pred1 and pred2"
并且可以说 none 的谓词符合下推过滤器的条件(为了简化)。
还假设 pred1
在计算上比 pred2
复杂得多(假设正则表达式模式匹配与否定)。
- 有没有办法验证 spark 会在
pred2
之前计算
pred1
?
- 这是确定性的吗?
- 这是可控的吗?
- 有什么办法可以看到最终的执行计划吗?
一般
问得好。
由于找不到合适的文档,通过测试场景和推论推断出答案。第二次尝试由于网络上的各种说法无法备份
This question I think is not about AQE Spark 3.x aspects, but it is
about say, a dataframe as part of Stage N of a Spark App that has
passed the stage of acquiring data from sources at rest, which is
subject to filtering with multiple predicates being applied.
Then the central point is does it matter how the predicates are
ordered or does Spark (Catalyst) re-order the predicates to minimize
the work to be done?
- The premise here is that filtering the maximum amount of data out first makes more sense than evaluating a predicate that filters very
little out.
- This is a well-known RDBMS point referring to sargable predicates (subject to evolution of definition over time).
- A lot of the discussion focused on indexes, Spark, Hive do not have this, but DF's are columnar.
Point 1
You can try for %sql
EXPLAIN EXTENDED select k, sum(v) from values (1, 2), (1, 3) t(k, v) group by k;
From this you can see what's going on if there is re-arranging of
predicates, but I saw no such aspects in the Physical Plan in non-AQE
mode on Databricks. Refer to
https://docs.databricks.com/sql/language-manual/sql-ref-syntax-qry-explain.html.
Catalyst can re-arrange filtering I read here and there. To what
extent, is a lot of research; I was not able to confirm this.
Also an interesting read:
https://www.waitingforcode.com/apache-spark-sql/catalyst-optimizer-in-spark-sql/read
Point 2
I ran the following pathetic contrived examples with the same
functional query but with predicates reversed, using a column that has
high cardinality and tested for a value that does not in fact exist
and then compared the count of the accumulator used in an UDF when called.
场景一
import org.apache.spark.sql.functions._
def randomInt1to1000000000 = scala.util.Random.nextInt(1000000000)+1
def randomInt1to10 = scala.util.Random.nextInt(10)+1
def randomInt1to1000000 = scala.util.Random.nextInt(1000000)+1
val df = sc.parallelize(Seq.fill(1000000){(randomInt1to1000000,randomInt1to1000000000,randomInt1to10)}).toDF("nuid","hc", "lc").withColumn("text", lpad($"nuid", 3, "0")).withColumn("literal",lit(1))
val accumulator = sc.longAccumulator("udf_call_count")
spark.udf.register("myUdf", (x: String) => {accumulator.add(1)
x.length}
)
accumulator.reset()
df.where("myUdf(text) = 3 and hc = -4").select(max($"text")).show(false)
println(s"Number of UDF calls ${accumulator.value}")
returns:
+---------+
|max(text)|
+---------+
|null |
+---------+
Number of UDF calls 1000000
场景二
import org.apache.spark.sql.functions._
def randomInt1to1000000000 = scala.util.Random.nextInt(1000000000)+1
def randomInt1to10 = scala.util.Random.nextInt(10)+1
def randomInt1to1000000 = scala.util.Random.nextInt(1000000)+1
val dfA = sc.parallelize(Seq.fill(1000000){(randomInt1to1000000,randomInt1to1000000000,randomInt1to10)}).toDF("nuid","hc", "lc").withColumn("text", lpad($"nuid", 3, "0")).withColumn("literal",lit(1))
val accumulator = sc.longAccumulator("udf_call_count")
spark.udf.register("myUdf", (x: String) => {accumulator.add(1)
x.length}
)
accumulator.reset()
dfA.where("hc = -4 and myUdf(text) = 3").select(max($"text")).show(false)
println(s"Number of UDF calls ${accumulator.value}")
returns:
+---------+
|max(text)|
+---------+
|null |
+---------+
Number of UDF calls 0
我的结论是:
从左到右进行评估 - 在这种情况下 - 因为对于 udf 有 0 次调用,因为场景 2 的累加器值为 0,而不是场景 1 中注册了 1M 调用。
因此,ORACLE 和 DB2 可能对第 1 阶段谓词执行的谓词处理顺序不适用。
Point 3
I note from the manual however
https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html the
following:
Evaluation order and null checking
Spark SQL (including SQL and the DataFrame and Dataset APIs) does not
guarantee the order of evaluation of subexpressions. In particular,
the inputs of an operator or function are not necessarily evaluated
left-to-right or in any other fixed order. For example, logical AND
and OR expressions do not have left-to-right “short-circuiting”
semantics.
Therefore, it is dangerous to rely on the side effects or order of
evaluation of Boolean expressions, and the order of WHERE and HAVING
clauses, since such expressions and clauses can be reordered during
query optimization and planning. Specifically, if a UDF relies on
short-circuiting semantics in SQL for null checking, there’s no
guarantee that the null check will happen before invoking the UDF. For
example,
spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee
This WHERE clause does not guarantee the strlen UDF to be invoked
after filtering out nulls.
To perform proper null checking, we recommend that you do either of
the following:
Make the UDF itself null-aware and do null checking inside the UDF
itself Use IF or CASE WHEN expressions to do the null check and invoke
the UDF in a conditional branch.
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Slight contradiction.
我试图了解 Spark SQL 中谓词求值的顺序,以提高查询的性能。
假设我有以下查询
"select * from tbl where pred1 and pred2"
并且可以说 none 的谓词符合下推过滤器的条件(为了简化)。
还假设 pred1
在计算上比 pred2
复杂得多(假设正则表达式模式匹配与否定)。
- 有没有办法验证 spark 会在
pred2
之前计算pred1
? - 这是确定性的吗?
- 这是可控的吗?
- 有什么办法可以看到最终的执行计划吗?
一般
问得好。
由于找不到合适的文档,通过测试场景和推论推断出答案。第二次尝试由于网络上的各种说法无法备份
This question I think is not about AQE Spark 3.x aspects, but it is about say, a dataframe as part of Stage N of a Spark App that has passed the stage of acquiring data from sources at rest, which is subject to filtering with multiple predicates being applied.
Then the central point is does it matter how the predicates are ordered or does Spark (Catalyst) re-order the predicates to minimize the work to be done?
- The premise here is that filtering the maximum amount of data out first makes more sense than evaluating a predicate that filters very little out.
- This is a well-known RDBMS point referring to sargable predicates (subject to evolution of definition over time).
- A lot of the discussion focused on indexes, Spark, Hive do not have this, but DF's are columnar.
Point 1
You can try for
%sql
EXPLAIN EXTENDED select k, sum(v) from values (1, 2), (1, 3) t(k, v) group by k;
From this you can see what's going on if there is re-arranging of predicates, but I saw no such aspects in the Physical Plan in non-AQE mode on Databricks. Refer to https://docs.databricks.com/sql/language-manual/sql-ref-syntax-qry-explain.html.
Catalyst can re-arrange filtering I read here and there. To what extent, is a lot of research; I was not able to confirm this.
Also an interesting read: https://www.waitingforcode.com/apache-spark-sql/catalyst-optimizer-in-spark-sql/read
Point 2
I ran the following pathetic contrived examples with the same functional query but with predicates reversed, using a column that has high cardinality and tested for a value that does not in fact exist and then compared the count of the accumulator used in an UDF when called.
场景一
import org.apache.spark.sql.functions._
def randomInt1to1000000000 = scala.util.Random.nextInt(1000000000)+1
def randomInt1to10 = scala.util.Random.nextInt(10)+1
def randomInt1to1000000 = scala.util.Random.nextInt(1000000)+1
val df = sc.parallelize(Seq.fill(1000000){(randomInt1to1000000,randomInt1to1000000000,randomInt1to10)}).toDF("nuid","hc", "lc").withColumn("text", lpad($"nuid", 3, "0")).withColumn("literal",lit(1))
val accumulator = sc.longAccumulator("udf_call_count")
spark.udf.register("myUdf", (x: String) => {accumulator.add(1)
x.length}
)
accumulator.reset()
df.where("myUdf(text) = 3 and hc = -4").select(max($"text")).show(false)
println(s"Number of UDF calls ${accumulator.value}")
returns:
+---------+
|max(text)|
+---------+
|null |
+---------+
Number of UDF calls 1000000
场景二
import org.apache.spark.sql.functions._
def randomInt1to1000000000 = scala.util.Random.nextInt(1000000000)+1
def randomInt1to10 = scala.util.Random.nextInt(10)+1
def randomInt1to1000000 = scala.util.Random.nextInt(1000000)+1
val dfA = sc.parallelize(Seq.fill(1000000){(randomInt1to1000000,randomInt1to1000000000,randomInt1to10)}).toDF("nuid","hc", "lc").withColumn("text", lpad($"nuid", 3, "0")).withColumn("literal",lit(1))
val accumulator = sc.longAccumulator("udf_call_count")
spark.udf.register("myUdf", (x: String) => {accumulator.add(1)
x.length}
)
accumulator.reset()
dfA.where("hc = -4 and myUdf(text) = 3").select(max($"text")).show(false)
println(s"Number of UDF calls ${accumulator.value}")
returns:
+---------+
|max(text)|
+---------+
|null |
+---------+
Number of UDF calls 0
我的结论是:
从左到右进行评估 - 在这种情况下 - 因为对于 udf 有 0 次调用,因为场景 2 的累加器值为 0,而不是场景 1 中注册了 1M 调用。
因此,ORACLE 和 DB2 可能对第 1 阶段谓词执行的谓词处理顺序不适用。
Point 3
I note from the manual however https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html the following:
Evaluation order and null checking
Spark SQL (including SQL and the DataFrame and Dataset APIs) does not guarantee the order of evaluation of subexpressions. In particular, the inputs of an operator or function are not necessarily evaluated left-to-right or in any other fixed order. For example, logical AND and OR expressions do not have left-to-right “short-circuiting” semantics.
Therefore, it is dangerous to rely on the side effects or order of evaluation of Boolean expressions, and the order of WHERE and HAVING clauses, since such expressions and clauses can be reordered during query optimization and planning. Specifically, if a UDF relies on short-circuiting semantics in SQL for null checking, there’s no guarantee that the null check will happen before invoking the UDF. For example,
spark.udf.register("strlen", (s: String) => s.length) spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee
This WHERE clause does not guarantee the strlen UDF to be invoked after filtering out nulls.
To perform proper null checking, we recommend that you do either of the following:
Make the UDF itself null-aware and do null checking inside the UDF itself Use IF or CASE WHEN expressions to do the null check and invoke the UDF in a conditional branch.
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1) spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Slight contradiction.