UDF 没有从哪些优化中获益?
Which optimizations do UDFs not benefit from?
Spark UDF 包含以下函数:nullable、deterministic、dataType 等。因此根据这些信息,它将受益于 ConstantFolding 等优化。它从哪些其他优化中受益,哪些优化不能从中受益?我问这个是因为许多演示文稿将 UDF 呈现为黑盒,它不会从催化剂优化中受益,但显然,它受益于 ConstantFolding。
Spark 通过将 UDF 包装在 class 中来处理它们。例如,当您编写以下内容时:
val example = udf((a: Int) => a * 2)
udf
函数所做的是创建一个 UserDefinedFunction
class,它在其 apply 函数中创建一个 ScalaUDF.
ScalaUDF extends Expression,并在其 doCodeGen 方法中执行以下:
...
val callFunc =
s"""
|$boxedType $resultTerm = null;
|try {
| $resultTerm = ($boxedType)$resultConverter.apply($getFuncResult);
|} catch (Exception e) {
| throw new org.apache.spark.SparkException($errorMsgTerm, e);
|}
""".stripMargin
ev.copy(code =
code"""
|$evalCode
|${initArgs.mkString("\n")}
|$callFunc
...
此函数将 column/expression 的 DataType
转换为 Scala 类型(因为您的 UDF 在 Scala 类型上运行),然后调用您的 lambda。 deterministic,
nullable,
和 dataTypes
是用户定义函数的包装函数,因为它扩展了 Expression,而不是您的函数。如果您想从中充分受益,则必须编写一个扩展 Expression
或其子 class 之一的自定义表达式。
以下为例:
val redundantUdf = udf((a: Long) => true)
someDf.filter(redundantUdf(someDf("col1"))).explain()
优化后的逻辑计划如下所示:
Project [_1#5736 AS Type#5739, _2#5737L AS sts#5740L]
+- Filter UDF(_2#5737L)
+- LocalRelation [_1#5736, _2#5737L]
如您所见,它正在执行过滤器,即使它是多余的并且始终计算为真。
鉴于以下内容:
someDf.filter(expr("true")).explain()
会给出如下优化逻辑方案:
LocalRelation [Type#5739, sts#5740L]
它使用 PruneFilter 规则删除过滤器。
这并不意味着所有优化都被排除在外,有些优化仍然适用于 UDF,例如 CombineFilter
,它结合了来自两个过滤器的表达式,例如:
== Analyzed Logical Plan ==
_1: string, _2: string
Filter UDF(_1#2)
+- Filter UDF(_1#2)
+- LocalRelation [_1#2, _2#3]
== Optimized Logical Plan ==
Filter (UDF(_1#2) && UDF(_1#2))
+- LocalRelation [_1#2, _2#3]
此优化之所以有效,是因为它仅依赖于 deterministic
字段,并且默认情况下 UDF 是确定性的。因此 UDF 将受益于不依赖于它包装的函数的简单优化。这是因为它采用催化剂无法理解的格式,催化剂在树上运行,并且您的闭包是 Scala 函数。还有其他一些 UDF 丢失的地方,例如指定生成的 java 代码和 spark 类型信息。
Spark UDF 包含以下函数:nullable、deterministic、dataType 等。因此根据这些信息,它将受益于 ConstantFolding 等优化。它从哪些其他优化中受益,哪些优化不能从中受益?我问这个是因为许多演示文稿将 UDF 呈现为黑盒,它不会从催化剂优化中受益,但显然,它受益于 ConstantFolding。
Spark 通过将 UDF 包装在 class 中来处理它们。例如,当您编写以下内容时:
val example = udf((a: Int) => a * 2)
udf
函数所做的是创建一个 UserDefinedFunction
class,它在其 apply 函数中创建一个 ScalaUDF.
ScalaUDF extends Expression,并在其 doCodeGen 方法中执行以下:
...
val callFunc =
s"""
|$boxedType $resultTerm = null;
|try {
| $resultTerm = ($boxedType)$resultConverter.apply($getFuncResult);
|} catch (Exception e) {
| throw new org.apache.spark.SparkException($errorMsgTerm, e);
|}
""".stripMargin
ev.copy(code =
code"""
|$evalCode
|${initArgs.mkString("\n")}
|$callFunc
...
此函数将 column/expression 的 DataType
转换为 Scala 类型(因为您的 UDF 在 Scala 类型上运行),然后调用您的 lambda。 deterministic,
nullable,
和 dataTypes
是用户定义函数的包装函数,因为它扩展了 Expression,而不是您的函数。如果您想从中充分受益,则必须编写一个扩展 Expression
或其子 class 之一的自定义表达式。
以下为例:
val redundantUdf = udf((a: Long) => true)
someDf.filter(redundantUdf(someDf("col1"))).explain()
优化后的逻辑计划如下所示:
Project [_1#5736 AS Type#5739, _2#5737L AS sts#5740L]
+- Filter UDF(_2#5737L)
+- LocalRelation [_1#5736, _2#5737L]
如您所见,它正在执行过滤器,即使它是多余的并且始终计算为真。
鉴于以下内容:
someDf.filter(expr("true")).explain()
会给出如下优化逻辑方案:
LocalRelation [Type#5739, sts#5740L]
它使用 PruneFilter 规则删除过滤器。
这并不意味着所有优化都被排除在外,有些优化仍然适用于 UDF,例如 CombineFilter
,它结合了来自两个过滤器的表达式,例如:
== Analyzed Logical Plan ==
_1: string, _2: string
Filter UDF(_1#2)
+- Filter UDF(_1#2)
+- LocalRelation [_1#2, _2#3]
== Optimized Logical Plan ==
Filter (UDF(_1#2) && UDF(_1#2))
+- LocalRelation [_1#2, _2#3]
此优化之所以有效,是因为它仅依赖于 deterministic
字段,并且默认情况下 UDF 是确定性的。因此 UDF 将受益于不依赖于它包装的函数的简单优化。这是因为它采用催化剂无法理解的格式,催化剂在树上运行,并且您的闭包是 Scala 函数。还有其他一些 UDF 丢失的地方,例如指定生成的 java 代码和 spark 类型信息。