问题 运行 大型数据集上的 Pandas UDF
Problem running a Pandas UDF on a large dataset
我目前正在做一个项目,我很难理解 PySpark 中的 Pandas UDF 是如何工作的。
我有一个 Spark 集群,它有一个 8 核和 64GB 的主节点,以及两个 16 核和 112GB 的工作节点。我的数据集非常大,分为七个主要分区,每个分区包含约 78M 行。数据集由 70 列组成。
我定义了一个 Pandas UDF 来对数据集进行一些操作,只能在 Pandas 数据帧上使用 Python 来完成。
pandas UDF 是这样定义的:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def operation(pdf):
#Some operations
return pdf
spark.table("my_dataset").groupBy(partition_cols).apply(operation)
绝对没有办法让 Pandas UDF 工作,因为它甚至在执行操作之前就崩溃了。我怀疑某处有 OOM 错误。 运行s 以上的代码在崩溃前持续了几分钟,错误代码表明连接已重置。
但是,如果我在一个分区上过滤后调用.toPandas() 函数然后显示它,运行 没问题,没有错误。该错误似乎仅在使用 PandasUDF 时发生。
我不明白它是如何工作的。 Spark 是否尝试一次转换整个分区(78M 行)?如果是这样,它使用什么内存?驱动内存?执行人的?如果它在驱动程序上,是否所有 Python 代码都在其上执行?
集群配置如下:
- SPARK_WORKER_CORES=2
- SPARK_WORKER_MEMORY=64g
- spark.executor.cores 2
- spark.executor.memory 30g(为 python 实例留出内存)
- spark.driver.memory 43g
我是不是遗漏了什么,或者只是没有办法通过 PandasUDF 运行 78M 行?
Does Spark try to convert one whole partition at once (78M lines) ?
事情就是这样。 Spark 3.0 添加了对分块 UDF 的支持,它对 Pandas DataFrames
或 Series
的迭代器进行操作,但如果对数据集进行 操作,则只能使用 Python,在 Pandas 数据框 上,这些可能不是您的正确选择。
If so, what memory does it use ? The driver memory? The executor's?
每个分区都在各自的执行器上本地处理,并且使用 Arrow 流将数据传入和传出 Python worker。
Am I missing something or is there just no way to run 78M lines through a PandasUDF?
只要您有足够的内存来处理 Arrow 输入、输出(尤其是复制数据时)、辅助数据结构以及 JVM 开销,它应该可以很好地处理大型数据集。
但在如此小的集群上,您最好直接使用 Pandas 分区输出和读取数据,而根本不使用 Spark。这样您就可以使用所有可用资源(即 > 100GB / 解释器)进行数据处理,而不是将这些资源浪费在次要任务上(有 16GB - 开销 / 解释器)。
回答有关在大型 pyspark 数据帧上使用 Pandas UDF 的一般问题:
如果您遇到内存不足的错误,例如
java.lang.OutOfMemoryError : GC overhead limit exceeded
或 java.lang.OutOfMemoryError: Java heap space
并且增加内存限制无效,请确保启用了 pyarrow。默认情况下它是禁用的。
在 pyspark 中,您可以使用以下方式启用它:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
更多信息here。
我目前正在做一个项目,我很难理解 PySpark 中的 Pandas UDF 是如何工作的。
我有一个 Spark 集群,它有一个 8 核和 64GB 的主节点,以及两个 16 核和 112GB 的工作节点。我的数据集非常大,分为七个主要分区,每个分区包含约 78M 行。数据集由 70 列组成。 我定义了一个 Pandas UDF 来对数据集进行一些操作,只能在 Pandas 数据帧上使用 Python 来完成。
pandas UDF 是这样定义的:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def operation(pdf):
#Some operations
return pdf
spark.table("my_dataset").groupBy(partition_cols).apply(operation)
绝对没有办法让 Pandas UDF 工作,因为它甚至在执行操作之前就崩溃了。我怀疑某处有 OOM 错误。 运行s 以上的代码在崩溃前持续了几分钟,错误代码表明连接已重置。 但是,如果我在一个分区上过滤后调用.toPandas() 函数然后显示它,运行 没问题,没有错误。该错误似乎仅在使用 PandasUDF 时发生。
我不明白它是如何工作的。 Spark 是否尝试一次转换整个分区(78M 行)?如果是这样,它使用什么内存?驱动内存?执行人的?如果它在驱动程序上,是否所有 Python 代码都在其上执行?
集群配置如下:
- SPARK_WORKER_CORES=2
- SPARK_WORKER_MEMORY=64g
- spark.executor.cores 2
- spark.executor.memory 30g(为 python 实例留出内存)
- spark.driver.memory 43g
我是不是遗漏了什么,或者只是没有办法通过 PandasUDF 运行 78M 行?
Does Spark try to convert one whole partition at once (78M lines) ?
事情就是这样。 Spark 3.0 添加了对分块 UDF 的支持,它对 Pandas DataFrames
或 Series
的迭代器进行操作,但如果对数据集进行 操作,则只能使用 Python,在 Pandas 数据框 上,这些可能不是您的正确选择。
If so, what memory does it use ? The driver memory? The executor's?
每个分区都在各自的执行器上本地处理,并且使用 Arrow 流将数据传入和传出 Python worker。
Am I missing something or is there just no way to run 78M lines through a PandasUDF?
只要您有足够的内存来处理 Arrow 输入、输出(尤其是复制数据时)、辅助数据结构以及 JVM 开销,它应该可以很好地处理大型数据集。
但在如此小的集群上,您最好直接使用 Pandas 分区输出和读取数据,而根本不使用 Spark。这样您就可以使用所有可用资源(即 > 100GB / 解释器)进行数据处理,而不是将这些资源浪费在次要任务上(有 16GB - 开销 / 解释器)。
回答有关在大型 pyspark 数据帧上使用 Pandas UDF 的一般问题:
如果您遇到内存不足的错误,例如
java.lang.OutOfMemoryError : GC overhead limit exceeded
或 java.lang.OutOfMemoryError: Java heap space
并且增加内存限制无效,请确保启用了 pyarrow。默认情况下它是禁用的。
在 pyspark 中,您可以使用以下方式启用它:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
更多信息here。