问题 运行 大型数据集上的 Pandas UDF

Problem running a Pandas UDF on a large dataset

我目前正在做一个项目,我很难理解 PySpark 中的 Pandas UDF 是如何工作的。

我有一个 Spark 集群,它有一个 8 核和 64GB 的主节点,以及两个 16 核和 112GB 的工作节点。我的数据集非常大,分为七个主要分区,每个分区包含约 78M 行。数据集由 70 列组成。 我定义了一个 Pandas UDF 来对数据集进行一些操作,只能在 Pandas 数据帧上使用 Python 来完成。

pandas UDF 是这样定义的:

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def operation(pdf):
   #Some operations
   return pdf

spark.table("my_dataset").groupBy(partition_cols).apply(operation)

绝对没有办法让 Pandas UDF 工作,因为它甚至在执行操作之前就崩溃了。我怀疑某处有 OOM 错误。 运行s 以上的代码在崩溃前持续了几分钟,错误代码表明连接已重置。 但是,如果我在一个分区上过滤后调用.toPandas() 函数然后显示它,运行 没问题,没有错误。该错误似乎仅在使用 PandasUDF 时发生。

我不明白它是如何工作的。 Spark 是否尝试一次转换整个分区(78M 行)?如果是这样,它使用什么内存?驱动内存?执行人的?如果它在驱动程序上,是否所有 Python 代码都在其上执行?

集群配置如下:

我是不是遗漏了什么,或者只是没有办法通过 PandasUDF 运行 78M 行?

Does Spark try to convert one whole partition at once (78M lines) ?

事情就是这样。 Spark 3.0 添加了对分块 UDF 的支持,它对 Pandas DataFramesSeries 的迭代器进行操作,但如果对数据集进行 操作,则只能使用 Python,在 Pandas 数据框 上,这些可能不是您的正确选择。

If so, what memory does it use ? The driver memory? The executor's?

每个分区都在各自的执行器上本地处理,并且使用 Arrow 流将数据传入和传出 Python worker。

Am I missing something or is there just no way to run 78M lines through a PandasUDF?

只要您有足够的内存来处理 Arrow 输入、输出(尤其是复制数据时)、辅助数据结构以及 JVM 开销,它应该可以很好地处理大型数据集。

但在如此小的集群上,您最好直接使用 Pandas 分区输出和读取数据,而根本不使用 Spark。这样您就可以使用所有可用资源(即 > 100GB / 解释器)进行数据处理,而不是将这些资源浪费在次要任务上(有 16GB - 开销 / 解释器)。

回答有关在大型 pyspark 数据帧上使用 Pandas UDF 的一般问题:

如果您遇到内存不足的错误,例如 java.lang.OutOfMemoryError : GC overhead limit exceededjava.lang.OutOfMemoryError: Java heap space 并且增加内存限制无效,请确保启用了 pyarrow。默认情况下它是禁用的。

在 pyspark 中,您可以使用以下方式启用它:

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

更多信息here