Spark SQL where 子句中谓词的求值顺序

Order of evaluation of predicates in Spark SQL where clause

我试图了解 Spark SQL 中谓词求值的顺序,以提高查询的性能。
假设我有以下查询

"select * from tbl where pred1 and pred2"

并且可以说 none 的谓词符合下推过滤器的条件(为了简化)。 还假设 pred1 在计算上比 pred2 复杂得多(假设正则表达式模式匹配与否定)。

一般

问得好。

由于找不到合适的文档,通过测试场景和推论推断出答案。第二次尝试由于网络上的各种说法无法备份

This question I think is not about AQE Spark 3.x aspects, but it is about say, a dataframe as part of Stage N of a Spark App that has passed the stage of acquiring data from sources at rest, which is subject to filtering with multiple predicates being applied.

Then the central point is does it matter how the predicates are ordered or does Spark (Catalyst) re-order the predicates to minimize the work to be done?

  • The premise here is that filtering the maximum amount of data out first makes more sense than evaluating a predicate that filters very little out.
    • This is a well-known RDBMS point referring to sargable predicates (subject to evolution of definition over time).
      • A lot of the discussion focused on indexes, Spark, Hive do not have this, but DF's are columnar.

Point 1

You can try for %sql

 EXPLAIN EXTENDED select k, sum(v) from values (1, 2), (1, 3) t(k, v) group by k;

From this you can see what's going on if there is re-arranging of predicates, but I saw no such aspects in the Physical Plan in non-AQE mode on Databricks. Refer to https://docs.databricks.com/sql/language-manual/sql-ref-syntax-qry-explain.html.

Catalyst can re-arrange filtering I read here and there. To what extent, is a lot of research; I was not able to confirm this.

Also an interesting read: https://www.waitingforcode.com/apache-spark-sql/catalyst-optimizer-in-spark-sql/read

Point 2

I ran the following pathetic contrived examples with the same functional query but with predicates reversed, using a column that has high cardinality and tested for a value that does not in fact exist and then compared the count of the accumulator used in an UDF when called.

场景一

import org.apache.spark.sql.functions._

def randomInt1to1000000000 = scala.util.Random.nextInt(1000000000)+1
def randomInt1to10 = scala.util.Random.nextInt(10)+1
def randomInt1to1000000 = scala.util.Random.nextInt(1000000)+1

val df = sc.parallelize(Seq.fill(1000000){(randomInt1to1000000,randomInt1to1000000000,randomInt1to10)}).toDF("nuid","hc", "lc").withColumn("text", lpad($"nuid", 3, "0")).withColumn("literal",lit(1)) 

val accumulator = sc.longAccumulator("udf_call_count")

spark.udf.register("myUdf", (x: String) => {accumulator.add(1)
                                            x.length}
                  )  

accumulator.reset()
df.where("myUdf(text) = 3 and hc = -4").select(max($"text")).show(false)
println(s"Number of UDF calls ${accumulator.value}")  

returns:

+---------+
|max(text)|
+---------+
|null     |
+---------+

Number of UDF calls 1000000 

场景二

import org.apache.spark.sql.functions._

def randomInt1to1000000000 = scala.util.Random.nextInt(1000000000)+1
def randomInt1to10 = scala.util.Random.nextInt(10)+1
def randomInt1to1000000 = scala.util.Random.nextInt(1000000)+1

val dfA = sc.parallelize(Seq.fill(1000000){(randomInt1to1000000,randomInt1to1000000000,randomInt1to10)}).toDF("nuid","hc", "lc").withColumn("text", lpad($"nuid", 3, "0")).withColumn("literal",lit(1)) 

val accumulator = sc.longAccumulator("udf_call_count")

spark.udf.register("myUdf", (x: String) => {accumulator.add(1)
                                            x.length}
                  )  

accumulator.reset()
dfA.where("hc = -4 and myUdf(text) = 3").select(max($"text")).show(false)
println(s"Number of UDF calls ${accumulator.value}")

returns:

+---------+
|max(text)|
+---------+
|null     |
+---------+

Number of UDF calls 0

我的结论是:

  • 从左到右进行评估 - 在这种情况下 - 因为对于 udf 有 0 次调用,因为场景 2 的累加器值为 0,而不是场景 1 中注册了 1M 调用。

  • 因此,ORACLE 和 DB2 可能对第 1 阶段谓词执行的谓词处理顺序不适用。

Point 3

I note from the manual however https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html the following:

Evaluation order and null checking

Spark SQL (including SQL and the DataFrame and Dataset APIs) does not guarantee the order of evaluation of subexpressions. In particular, the inputs of an operator or function are not necessarily evaluated left-to-right or in any other fixed order. For example, logical AND and OR expressions do not have left-to-right “short-circuiting” semantics.

Therefore, it is dangerous to rely on the side effects or order of evaluation of Boolean expressions, and the order of WHERE and HAVING clauses, since such expressions and clauses can be reordered during query optimization and planning. Specifically, if a UDF relies on short-circuiting semantics in SQL for null checking, there’s no guarantee that the null check will happen before invoking the UDF. For example,

spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee

This WHERE clause does not guarantee the strlen UDF to be invoked after filtering out nulls.

To perform proper null checking, we recommend that you do either of the following:

Make the UDF itself null-aware and do null checking inside the UDF itself Use IF or CASE WHEN expressions to do the null check and invoke the UDF in a conditional branch.

spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Slight contradiction.