Spark 函数 vs UDF 性能?
Spark functions vs UDF performance?
Spark 现在提供可在数据框中使用的预定义函数,而且它们似乎已经过高度优化。我最初的问题是哪个更快,但我自己做了一些测试,发现 spark 函数至少在一个实例中快了大约 10 倍。有谁知道为什么会这样,udf 什么时候会更快(仅适用于存在相同 spark 函数的情况)?
这是我的测试代码(运行 在 Databricks 社区编辑):
# UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)
# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
name = fake.name().split()
return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)
# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
for _ in xrange(times):
yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print len(data)
data[0]
dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age'))
dataDF.cache()
UDF函数:
concat_s = udf(lambda s: s+ 's')
udfData = dataDF.select(concat_s(dataDF.first_name).alias('name'))
udfData.count()
Spark 函数:
spfData = dataDF.select(concat(dataDF.first_name, lit('s')).alias('name'))
spfData.count()
运行 两次,udf 通常花费大约 1.1 - 1.4 秒,而 Spark concat
函数总是花费不到 0.15 秒。
when would a udf be faster
如果您询问 Python UDF,答案可能永远不会*。由于 SQL 函数相对简单并且不是为复杂任务设计的,因此几乎不可能补偿 Python 解释器和 JVM 之间重复序列化、反序列化和数据移动的成本。
Does anyone know why this is so
上面已经列举了主要原因,可以归结为一个简单的事实,Spark DataFrame
本身就是一个 JVM 结构,标准的访问方法是通过简单调用 Java API.另一方面,UDF 在 Python 中实现,需要来回移动数据。
虽然 PySpark 通常需要 JVM 和 Python 之间的数据移动,但在低级别 RDD API 的情况下,它通常不需要昂贵的 serde activity。 Spark SQL 增加了序列化和序列化的额外成本,以及将数据从 JVM 上的不安全表示移动到不安全表示的成本。后一种特定于所有 UDF(Python、Scala 和 Java),而前一种特定于非本地语言。
与 UDF 不同,Spark SQL 函数直接在 JVM 上运行,并且通常与 Catalyst 和 Tungsten 很好地集成。这意味着这些可以在执行计划中进行优化,并且大多数时候可以受益于 codgen 和其他 Tungsten 优化。此外,这些可以在其 "native" 表示中对数据进行操作。
所以从某种意义上说,这里的问题是 Python UDF 必须将数据带到代码中,而 SQL 表达式则相反。
* 根据 rough estimates PySpark window UDF 可以击败 Scala window 函数。
多年后,当我有了更多的火花知识并重新审视这个问题时,才意识到@alfredox 真正想问的是什么。所以我又修改了一遍,把答案分为两部分:
回答为什么原生 DF 函数(原生 Spark-SQL 函数)更快:
基本上,为什么原生 Spark 函数总是比 Spark UDF 快,无论您的 UDF 是在 Python 还是 Scala 中实现的。
首先,我们需要了解什么是Tungsten, which is firstly introduced in Spark 1.4。
它是一个后端及其关注点:
- Off-Heap Memory Management using binary in-memory data representation aka Tungsten row format and managing memory explicitly,
- Cache Locality which is about cache-aware computations with cache-aware layout for high cache hit rates,
- Whole-Stage Code Generation (aka CodeGen).
Spark 最大的性能杀手之一是 GC。 GC 将暂停 JVM 中的每个线程,直到 GC 完成。这正是引入 Off-Heap 内存管理的原因。
执行Spark-SQL原生函数时,数据会留在tungsten后端。但是在Spark UDF场景下,数据会从tungsten移出到JVM(Scala场景)或者JVM和PythonProcess(Python)做实际的处理,然后再移回tungsten。结果:
- 不可避免地,会有开销/惩罚:
- 反序列化来自 tungsten 的输入。
- 将输出序列化回钨。
- 即使使用 Spark 中的 first-class 公民 Scala,它也会增加 JVM 中的内存占用,并且可能会涉及更多 JVM 中的 GC。
这个问题正是 tungsten "Off-Heap Memory Management" 功能试图解决的问题。
回答 Python 是否一定比 Scala 慢:
自 2017 年 10 月 30 日起,Spark 刚刚为 pyspark 引入了矢量化 udfs。
https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html
Python UDF 很慢的原因,可能是 PySpark UDF 没有以最优化的方式实现:
根据 link 中的段落。
Spark added a Python API in version 0.7, with support for user-defined functions. These user-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead.
然而,新矢量化的 udfs 似乎大大提高了性能:
ranging from 3x to over 100x.
在恢复使用您自己的自定义 UDF 函数之前,尽可能使用更高级别的标准基于列的函数和数据集运算符,因为 UDF 是 Spark 的 BlackBox,所以它确实如此甚至不尝试优化它们。
屏幕背后实际发生的事情是,Catalyst 根本无法处理和优化 UDF,并且将它们作为 BlackBox 进行威胁,从而导致失去许多优化,例如 Predicate pushdown、Constant folding 等。
Spark 现在提供可在数据框中使用的预定义函数,而且它们似乎已经过高度优化。我最初的问题是哪个更快,但我自己做了一些测试,发现 spark 函数至少在一个实例中快了大约 10 倍。有谁知道为什么会这样,udf 什么时候会更快(仅适用于存在相同 spark 函数的情况)?
这是我的测试代码(运行 在 Databricks 社区编辑):
# UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)
# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
name = fake.name().split()
return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)
# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
for _ in xrange(times):
yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print len(data)
data[0]
dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age'))
dataDF.cache()
UDF函数:
concat_s = udf(lambda s: s+ 's')
udfData = dataDF.select(concat_s(dataDF.first_name).alias('name'))
udfData.count()
Spark 函数:
spfData = dataDF.select(concat(dataDF.first_name, lit('s')).alias('name'))
spfData.count()
运行 两次,udf 通常花费大约 1.1 - 1.4 秒,而 Spark concat
函数总是花费不到 0.15 秒。
when would a udf be faster
如果您询问 Python UDF,答案可能永远不会*。由于 SQL 函数相对简单并且不是为复杂任务设计的,因此几乎不可能补偿 Python 解释器和 JVM 之间重复序列化、反序列化和数据移动的成本。
Does anyone know why this is so
上面已经列举了主要原因,可以归结为一个简单的事实,Spark DataFrame
本身就是一个 JVM 结构,标准的访问方法是通过简单调用 Java API.另一方面,UDF 在 Python 中实现,需要来回移动数据。
虽然 PySpark 通常需要 JVM 和 Python 之间的数据移动,但在低级别 RDD API 的情况下,它通常不需要昂贵的 serde activity。 Spark SQL 增加了序列化和序列化的额外成本,以及将数据从 JVM 上的不安全表示移动到不安全表示的成本。后一种特定于所有 UDF(Python、Scala 和 Java),而前一种特定于非本地语言。
与 UDF 不同,Spark SQL 函数直接在 JVM 上运行,并且通常与 Catalyst 和 Tungsten 很好地集成。这意味着这些可以在执行计划中进行优化,并且大多数时候可以受益于 codgen 和其他 Tungsten 优化。此外,这些可以在其 "native" 表示中对数据进行操作。
所以从某种意义上说,这里的问题是 Python UDF 必须将数据带到代码中,而 SQL 表达式则相反。
* 根据 rough estimates PySpark window UDF 可以击败 Scala window 函数。
多年后,当我有了更多的火花知识并重新审视这个问题时,才意识到@alfredox 真正想问的是什么。所以我又修改了一遍,把答案分为两部分:
回答为什么原生 DF 函数(原生 Spark-SQL 函数)更快:
基本上,为什么原生 Spark 函数总是比 Spark UDF 快,无论您的 UDF 是在 Python 还是 Scala 中实现的。
首先,我们需要了解什么是Tungsten, which is firstly introduced in Spark 1.4。
它是一个后端及其关注点:
- Off-Heap Memory Management using binary in-memory data representation aka Tungsten row format and managing memory explicitly,
- Cache Locality which is about cache-aware computations with cache-aware layout for high cache hit rates,
- Whole-Stage Code Generation (aka CodeGen).
Spark 最大的性能杀手之一是 GC。 GC 将暂停 JVM 中的每个线程,直到 GC 完成。这正是引入 Off-Heap 内存管理的原因。
执行Spark-SQL原生函数时,数据会留在tungsten后端。但是在Spark UDF场景下,数据会从tungsten移出到JVM(Scala场景)或者JVM和PythonProcess(Python)做实际的处理,然后再移回tungsten。结果:
- 不可避免地,会有开销/惩罚:
- 反序列化来自 tungsten 的输入。
- 将输出序列化回钨。
- 即使使用 Spark 中的 first-class 公民 Scala,它也会增加 JVM 中的内存占用,并且可能会涉及更多 JVM 中的 GC。 这个问题正是 tungsten "Off-Heap Memory Management" 功能试图解决的问题。
回答 Python 是否一定比 Scala 慢:
自 2017 年 10 月 30 日起,Spark 刚刚为 pyspark 引入了矢量化 udfs。
https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html
Python UDF 很慢的原因,可能是 PySpark UDF 没有以最优化的方式实现:
根据 link 中的段落。
Spark added a Python API in version 0.7, with support for user-defined functions. These user-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead.
然而,新矢量化的 udfs 似乎大大提高了性能:
ranging from 3x to over 100x.
在恢复使用您自己的自定义 UDF 函数之前,尽可能使用更高级别的标准基于列的函数和数据集运算符,因为 UDF 是 Spark 的 BlackBox,所以它确实如此甚至不尝试优化它们。
屏幕背后实际发生的事情是,Catalyst 根本无法处理和优化 UDF,并且将它们作为 BlackBox 进行威胁,从而导致失去许多优化,例如 Predicate pushdown、Constant folding 等。