PySpark:带有标量 Pandas UDF 的无效 returnType
PySpark: Invalid returnType with scalar Pandas UDFs
我正在尝试 return 来自 pandas_udf 的特定结构。它在一个集群上工作但在另一个集群上失败。
我尝试 运行 组上的 udf,这需要 return 类型作为数据框。
from pyspark.sql.functions import pandas_udf
import pandas as pd
import numpy as np
from pyspark.sql.types import *
schema = StructType([
StructField("Distance", FloatType()),
StructField("CarId", IntegerType())
])
def haversine(lon1, lat1, lon2, lat2):
#Calculate distance, return scalar
return 3.5 # Removed logic to facilitate reading
@pandas_udf(schema)
def totalDistance(oneCar):
dist = haversine(oneCar.Longtitude.shift(1),
oneCar.Latitude.shift(1),
oneCar.loc[1:, 'Longitude'],
oneCar.loc[1:, 'Latitude'])
return pd.DataFrame({"CarId":oneCar['CarId'].iloc[0],"Distance":np.sum(dist)},index = [0])
## Calculate the overall distance made by each car
distancePerCar= df.groupBy('CarId').apply(totalDistance)
这是我遇到的异常:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self)
114 try:
--> 115 to_arrow_type(self._returnType_placeholder)
116 except TypeError:
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\types.py in to_arrow_type(dt)
1641 else:
-> 1642 raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
1643 return arrow_type
TypeError: Unsupported type in conversion to Arrow: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true)))
During handling of the above exception, another exception occurred:
NotImplementedError Traceback (most recent call last)
<ipython-input-35-4f2194cfb998> in <module>()
18 km = 6367 * c
19 return km
---> 20 @pandas_udf("CarId: int, Distance: float")
21 def totalDistance(oneUser):
22 dist = haversine(oneUser.Longtitude.shift(1), oneUser.Latitude.shift(1),
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _create_udf(f, returnType, evalType)
62 udf_obj = UserDefinedFunction(
63 f, returnType=returnType, name=None, evalType=evalType, deterministic=True)
---> 64 return udf_obj._wrapped()
65
66
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _wrapped(self)
184
185 wrapper.func = self.func
--> 186 wrapper.returnType = self.returnType
187 wrapper.evalType = self.evalType
188 wrapper.deterministic = self.deterministic
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self)
117 raise NotImplementedError(
118 "Invalid returnType with scalar Pandas UDFs: %s is "
--> 119 "not supported" % str(self._returnType_placeholder))
120 elif self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
121 if isinstance(self._returnType_placeholder, StructType):
NotImplementedError: Invalid returnType with scalar Pandas UDFs: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true))) is not supported
我也试过将架构更改为
@pandas_udf("<CarId:int,Distance:float>")
和
@pandas_udf("CarId:int,Distance:float")
但得到同样的异常。我怀疑这与我的 pyarrow 版本有关,它与我的 pyspark 版本不兼容。
如有任何帮助,我们将不胜感激。谢谢!
正如错误消息 ("Invalid returnType with scalar Pandas UDFs" 中所报告的那样,您正在尝试创建标量矢量化 pandas UDF,但使用的是 StructType 模式和返回一个 pandas DataFrame。
您应该将函数声明为 GROUPED MAP pandas UDF,即:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
pyspark 文档中解释了标量和分组矢量化 UDF 之间的区别:http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf。
A scalar UDF defines a transformation: One or more pandas.Series -> A pandas.Series. The returnType should be a primitive data type, e.g., DoubleType(). The length of the returned pandas.Series must be of the same as the input pandas.Series.
总而言之,标量 pandas UDF 一次处理一列(pandas 系列),从而比一次处理一个行元素的传统 UDF 具有更好的性能。请注意,性能改进归功于使用 PyArrow 的高效 python 序列化。
A grouped map UDF defines transformation: A pandas.DataFrame -> A pandas.DataFrame The returnType should be a StructType describing the schema of the returned pandas.DataFrame. The length of the returned pandas.DataFrame can be arbitrary and the columns must be indexed so that their position matches the corresponding field in the schema.
分组的 pandas UDF 一次处理多行和多列(使用 pandas DataFrame,不要与 Spark DataFrame 混淆),并且对于多变量操作非常有用和高效(尤其是在使用本地 python 数值分析和机器学习库时,如 numpy、scipy、scikit-learn 等)。在这种情况下,输出是一个包含多列的单行 DataFrame。
请注意,我没有检查代码的内部逻辑,只检查了方法论。
我正在尝试 return 来自 pandas_udf 的特定结构。它在一个集群上工作但在另一个集群上失败。 我尝试 运行 组上的 udf,这需要 return 类型作为数据框。
from pyspark.sql.functions import pandas_udf
import pandas as pd
import numpy as np
from pyspark.sql.types import *
schema = StructType([
StructField("Distance", FloatType()),
StructField("CarId", IntegerType())
])
def haversine(lon1, lat1, lon2, lat2):
#Calculate distance, return scalar
return 3.5 # Removed logic to facilitate reading
@pandas_udf(schema)
def totalDistance(oneCar):
dist = haversine(oneCar.Longtitude.shift(1),
oneCar.Latitude.shift(1),
oneCar.loc[1:, 'Longitude'],
oneCar.loc[1:, 'Latitude'])
return pd.DataFrame({"CarId":oneCar['CarId'].iloc[0],"Distance":np.sum(dist)},index = [0])
## Calculate the overall distance made by each car
distancePerCar= df.groupBy('CarId').apply(totalDistance)
这是我遇到的异常:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self)
114 try:
--> 115 to_arrow_type(self._returnType_placeholder)
116 except TypeError:
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\types.py in to_arrow_type(dt)
1641 else:
-> 1642 raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
1643 return arrow_type
TypeError: Unsupported type in conversion to Arrow: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true)))
During handling of the above exception, another exception occurred:
NotImplementedError Traceback (most recent call last)
<ipython-input-35-4f2194cfb998> in <module>()
18 km = 6367 * c
19 return km
---> 20 @pandas_udf("CarId: int, Distance: float")
21 def totalDistance(oneUser):
22 dist = haversine(oneUser.Longtitude.shift(1), oneUser.Latitude.shift(1),
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _create_udf(f, returnType, evalType)
62 udf_obj = UserDefinedFunction(
63 f, returnType=returnType, name=None, evalType=evalType, deterministic=True)
---> 64 return udf_obj._wrapped()
65
66
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _wrapped(self)
184
185 wrapper.func = self.func
--> 186 wrapper.returnType = self.returnType
187 wrapper.evalType = self.evalType
188 wrapper.deterministic = self.deterministic
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self)
117 raise NotImplementedError(
118 "Invalid returnType with scalar Pandas UDFs: %s is "
--> 119 "not supported" % str(self._returnType_placeholder))
120 elif self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
121 if isinstance(self._returnType_placeholder, StructType):
NotImplementedError: Invalid returnType with scalar Pandas UDFs: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true))) is not supported
我也试过将架构更改为
@pandas_udf("<CarId:int,Distance:float>")
和
@pandas_udf("CarId:int,Distance:float")
但得到同样的异常。我怀疑这与我的 pyarrow 版本有关,它与我的 pyspark 版本不兼容。
如有任何帮助,我们将不胜感激。谢谢!
正如错误消息 ("Invalid returnType with scalar Pandas UDFs" 中所报告的那样,您正在尝试创建标量矢量化 pandas UDF,但使用的是 StructType 模式和返回一个 pandas DataFrame。
您应该将函数声明为 GROUPED MAP pandas UDF,即:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
pyspark 文档中解释了标量和分组矢量化 UDF 之间的区别:http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf。
A scalar UDF defines a transformation: One or more pandas.Series -> A pandas.Series. The returnType should be a primitive data type, e.g., DoubleType(). The length of the returned pandas.Series must be of the same as the input pandas.Series.
总而言之,标量 pandas UDF 一次处理一列(pandas 系列),从而比一次处理一个行元素的传统 UDF 具有更好的性能。请注意,性能改进归功于使用 PyArrow 的高效 python 序列化。
A grouped map UDF defines transformation: A pandas.DataFrame -> A pandas.DataFrame The returnType should be a StructType describing the schema of the returned pandas.DataFrame. The length of the returned pandas.DataFrame can be arbitrary and the columns must be indexed so that their position matches the corresponding field in the schema.
分组的 pandas UDF 一次处理多行和多列(使用 pandas DataFrame,不要与 Spark DataFrame 混淆),并且对于多变量操作非常有用和高效(尤其是在使用本地 python 数值分析和机器学习库时,如 numpy、scipy、scikit-learn 等)。在这种情况下,输出是一个包含多列的单行 DataFrame。
请注意,我没有检查代码的内部逻辑,只检查了方法论。