PySpark pandas_udfs java.lang.IllegalArgumentException 错误
PySpark pandas_udfs java.lang.IllegalArgumentException error
有没有人有在 Windows 上的本地 pyspark 会话 运行 上使用 pandas UDFs 的经验?我已经在 linux 上使用它们并取得了不错的效果,但我在 Windows 机器上却没有成功。
环境:
python==3.7
pyarrow==0.15
pyspark==2.3.4
pandas==0.24
java version "1.8.0_74"
示例脚本:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "false")
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
out_df = df.groupby("id").apply(subtract_mean).toPandas()
print(out_df.head())
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
在 运行 很长一段时间后(将 toPandas 阶段分成 200 个任务,每个任务占用一秒钟)它 returns 出现这样的错误:
Traceback (most recent call last):
File "C:\miniconda3\envs\pandas_udf\lib\site-packages\pyspark\sql\dataframe.py", line 1953, in toPandas
tables = self._collectAsArrow()
File "C:\miniconda3\envs\pandas_udf\lib\site-packages\pyspark\sql\dataframe.py", line 2004, in _collectAsArrow
sock_info = self._jdf.collectAsArrowToPython()
File "C:\miniconda3\envs\pandas_udf\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\miniconda3\envs\pandas_udf\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "C:\miniconda3\envs\pandas_udf\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o62.collectAsArrowToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 69 in stage 3.0 failed 1 times, most recent failure: Lost task 69.0 in stage 3.0 (TID 201, localhost, executor driver): java.lang.IllegalArgumentException
at java.nio.ByteBuffer.allocate(Unknown Source)
at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNextMessage(MessageChannelReader.java:64)
at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeSchema(MessageSerializer.java:104)
at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:128)
at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon.read(ArrowPythonRunner.scala:161)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon.read(ArrowPythonRunner.scala:121)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:290)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon.hasNext(ArrowConverters.scala:96)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon.foreach(ArrowConverters.scala:94)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon.to(ArrowConverters.scala:94)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon.toBuffer(ArrowConverters.scala:94)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon.toArray(ArrowConverters.scala:94)
at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:945)
at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:945)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2074)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2074)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
您在 pandas_udf
中的 java.lang.IllegalArgumentException
与 pyarrow
版本有关,与 OS 环境无关。有关详细信息,请参阅此 issue。
你有两个行动路线:
- 将
pyarrow
降级到 v.0.14,或者
- 添加环境变量
ARROW_PRE_0_15_IPC_FORMAT=1
到SPARK_HOME/conf/spark-env.sh
- 在 Windows,您需要在 conf 目录中有一个
spark-env.cmd
文件:set ARROW_PRE_0_15_IPC_FORMAT=1
、
谢尔盖回答的补充:
如果您更喜欢在 python 中构建自己的 sparkSession 并且不更改配置文件,则需要同时设置 spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT
和本地执行程序的环境变量 spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT
spark_session = SparkSession.builder \
.master("yarn") \
.config('spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT',1)\
.config('spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT',1)
spark = spark_session.getOrCreate()
希望对您有所帮助!
有没有人有在 Windows 上的本地 pyspark 会话 运行 上使用 pandas UDFs 的经验?我已经在 linux 上使用它们并取得了不错的效果,但我在 Windows 机器上却没有成功。
环境:
python==3.7
pyarrow==0.15
pyspark==2.3.4
pandas==0.24
java version "1.8.0_74"
示例脚本:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "false")
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
out_df = df.groupby("id").apply(subtract_mean).toPandas()
print(out_df.head())
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
在 运行 很长一段时间后(将 toPandas 阶段分成 200 个任务,每个任务占用一秒钟)它 returns 出现这样的错误:
Traceback (most recent call last):
File "C:\miniconda3\envs\pandas_udf\lib\site-packages\pyspark\sql\dataframe.py", line 1953, in toPandas
tables = self._collectAsArrow()
File "C:\miniconda3\envs\pandas_udf\lib\site-packages\pyspark\sql\dataframe.py", line 2004, in _collectAsArrow
sock_info = self._jdf.collectAsArrowToPython()
File "C:\miniconda3\envs\pandas_udf\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\miniconda3\envs\pandas_udf\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "C:\miniconda3\envs\pandas_udf\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o62.collectAsArrowToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 69 in stage 3.0 failed 1 times, most recent failure: Lost task 69.0 in stage 3.0 (TID 201, localhost, executor driver): java.lang.IllegalArgumentException
at java.nio.ByteBuffer.allocate(Unknown Source)
at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNextMessage(MessageChannelReader.java:64)
at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeSchema(MessageSerializer.java:104)
at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:128)
at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon.read(ArrowPythonRunner.scala:161)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon.read(ArrowPythonRunner.scala:121)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:290)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon.hasNext(ArrowConverters.scala:96)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon.foreach(ArrowConverters.scala:94)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon.to(ArrowConverters.scala:94)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon.toBuffer(ArrowConverters.scala:94)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon.toArray(ArrowConverters.scala:94)
at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:945)
at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:945)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2074)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2074)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
您在 pandas_udf
中的 java.lang.IllegalArgumentException
与 pyarrow
版本有关,与 OS 环境无关。有关详细信息,请参阅此 issue。
你有两个行动路线:
- 将
pyarrow
降级到 v.0.14,或者 - 添加环境变量
ARROW_PRE_0_15_IPC_FORMAT=1
到SPARK_HOME/conf/spark-env.sh
- 在 Windows,您需要在 conf 目录中有一个
spark-env.cmd
文件:set ARROW_PRE_0_15_IPC_FORMAT=1
、
- 在 Windows,您需要在 conf 目录中有一个
谢尔盖回答的补充:
如果您更喜欢在 python 中构建自己的 sparkSession 并且不更改配置文件,则需要同时设置 spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT
和本地执行程序的环境变量 spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT
spark_session = SparkSession.builder \
.master("yarn") \
.config('spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT',1)\
.config('spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT',1)
spark = spark_session.getOrCreate()
希望对您有所帮助!