class 内的 Pyspark User-Defined_functions
Pyspark User-Defined_functions inside of a class
我正在尝试在 python class 中创建一个 Spark-UDF。意思是,class 中的方法之一是 UDF。我收到一个名为的错误
“ PicklingError:无法序列化对象:TypeError:无法腌制 _MovedItems 对象”
环境:Azure Databricks。 (DBR 6.1 测试版)
代码执行:在内置笔记本中。
Python 版本:3.5
星火版本:2.4.4
我尝试在 class 之外的单独单元格中定义 UDF,并且 UDF 有效。我不想写那样的代码,我需要遵循 OOP 原则并希望保持结构化。
我在 Google 上尝试了所有方法,但没有帮助。事实上,我什至没有得到关于我得到的错误的信息。
“ PicklingError:无法序列化对象:TypeError:无法腌制 _MovedItems 对象”
class phases():
def __init__(self, each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg):
print("Inside the constructor of Class phases ")
#I need the below 2 variables to be used in my UDF, so i am trying to put
them in a class
self.each_mp_pair_phases_df = each_mp_pair_df_as_arg
self.unique_mp_pair_phases_df = unique_mp_pair_df_as_arg
#This is the UDF.
def phases_commence(self,each_row):
print(a)
return 1
#This is the function that registers the UDF,
def initiate_the_phases_on_the_major_track_segment(self):
print("Inside the 'initiate_the_phases_on_the_major_track_segment()'")
#registering the UDF
self.phases_udf = udf(self.phases_commence,LongType())
new_df = self.each_mp_pair_phases_df.withColumn("status", self.phases_udf((struct([self.each_mp_pair_phases_df[x] for x in self.each_mp_pair_phases_df.columns]))))
display(new_df)
#This is a method in a different notebook that creates an object for the above shown class and calls the methods that registers the UDF.
def getting_ready_for_the_phases(each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg):
phase_obj = phases(each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg)
phase_obj.initiate_the_phases_on_the_major_track_segment()
错误信息是:
PicklingError:无法序列化对象:TypeError:无法腌制 _MovedItems 对象
您的函数需要是静态的才能将其定义为 udf。我正在寻找一些文档来提供一个很好的解释,但找不到它。
基本上(可能不是 100% 准确;欢迎更正)当您定义一个 udf 时,它会自动被 pickle 并复制到每个执行程序,但您不能 pickle a single method of a class which is not defined at the top level (the class is part of the top level but not its methods). Have a look at this post 除了静态方法之外的变通方法。
import pyspark.sql.functions as F
import pyspark.sql.types as T
class Phases():
def __init__(self, df1):
print("Inside the constructor of Class phases ")
self.df1 = df1
self.phases_udf = F.udf(Phases.phases_commence,T.IntegerType())
#This is the UDF.
@staticmethod
def phases_commence(age):
age = age +3
return age
#This is the function that registers the UDF,
def doSomething(self):
print("Inside the doSomething")
self.df1 = self.df1.withColumn('AgeP2', self.phases_udf(F.col('Age')))
l =[(1, 10 , 'F')
,(2 , 2 , 'M')
,(2 , 10 , 'F')
,(2 , 3 , 'F')
,(3 , 10, 'M')]
columns = ['id', 'Age', 'Gender']
df=spark.createDataFrame(l, columns)
bla = Phases(df)
bla.doSomething()
bla.df1.show()
输出:
Inside the constructor of Class phases
Inside the 'initiate_the_phases_on_the_major_track_segment()'
+---+---+------+-----+
| id|Age|Gender|AgeP2|
+---+---+------+-----+
| 1| 10| F| 13|
| 2| 2| M| 5|
| 2| 10| F| 13|
| 2| 3| F| 6|
| 3| 10| M| 13|
+---+---+------+-----+
@udf 糖的语法稍微简单了...
class Phases:
def __init__(self, df1):
self.df1 = df1
#This is the UDF, double-decorated..
@staticmethod
@udf(returnType=IntegerType())
def phases_udf(age):
age += 3
return age
#This is the function that registers the UDF
def doSomething(self):
self.df1 = self.df1.withColumn('AgeP2', self.phases_udf(col('Age')))
我正在尝试在 python class 中创建一个 Spark-UDF。意思是,class 中的方法之一是 UDF。我收到一个名为的错误 “ PicklingError:无法序列化对象:TypeError:无法腌制 _MovedItems 对象”
环境:Azure Databricks。 (DBR 6.1 测试版) 代码执行:在内置笔记本中。 Python 版本:3.5 星火版本:2.4.4
我尝试在 class 之外的单独单元格中定义 UDF,并且 UDF 有效。我不想写那样的代码,我需要遵循 OOP 原则并希望保持结构化。 我在 Google 上尝试了所有方法,但没有帮助。事实上,我什至没有得到关于我得到的错误的信息。 “ PicklingError:无法序列化对象:TypeError:无法腌制 _MovedItems 对象”
class phases():
def __init__(self, each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg):
print("Inside the constructor of Class phases ")
#I need the below 2 variables to be used in my UDF, so i am trying to put
them in a class
self.each_mp_pair_phases_df = each_mp_pair_df_as_arg
self.unique_mp_pair_phases_df = unique_mp_pair_df_as_arg
#This is the UDF.
def phases_commence(self,each_row):
print(a)
return 1
#This is the function that registers the UDF,
def initiate_the_phases_on_the_major_track_segment(self):
print("Inside the 'initiate_the_phases_on_the_major_track_segment()'")
#registering the UDF
self.phases_udf = udf(self.phases_commence,LongType())
new_df = self.each_mp_pair_phases_df.withColumn("status", self.phases_udf((struct([self.each_mp_pair_phases_df[x] for x in self.each_mp_pair_phases_df.columns]))))
display(new_df)
#This is a method in a different notebook that creates an object for the above shown class and calls the methods that registers the UDF.
def getting_ready_for_the_phases(each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg):
phase_obj = phases(each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg)
phase_obj.initiate_the_phases_on_the_major_track_segment()
错误信息是: PicklingError:无法序列化对象:TypeError:无法腌制 _MovedItems 对象
您的函数需要是静态的才能将其定义为 udf。我正在寻找一些文档来提供一个很好的解释,但找不到它。
基本上(可能不是 100% 准确;欢迎更正)当您定义一个 udf 时,它会自动被 pickle 并复制到每个执行程序,但您不能 pickle a single method of a class which is not defined at the top level (the class is part of the top level but not its methods). Have a look at this post 除了静态方法之外的变通方法。
import pyspark.sql.functions as F
import pyspark.sql.types as T
class Phases():
def __init__(self, df1):
print("Inside the constructor of Class phases ")
self.df1 = df1
self.phases_udf = F.udf(Phases.phases_commence,T.IntegerType())
#This is the UDF.
@staticmethod
def phases_commence(age):
age = age +3
return age
#This is the function that registers the UDF,
def doSomething(self):
print("Inside the doSomething")
self.df1 = self.df1.withColumn('AgeP2', self.phases_udf(F.col('Age')))
l =[(1, 10 , 'F')
,(2 , 2 , 'M')
,(2 , 10 , 'F')
,(2 , 3 , 'F')
,(3 , 10, 'M')]
columns = ['id', 'Age', 'Gender']
df=spark.createDataFrame(l, columns)
bla = Phases(df)
bla.doSomething()
bla.df1.show()
输出:
Inside the constructor of Class phases
Inside the 'initiate_the_phases_on_the_major_track_segment()'
+---+---+------+-----+
| id|Age|Gender|AgeP2|
+---+---+------+-----+
| 1| 10| F| 13|
| 2| 2| M| 5|
| 2| 10| F| 13|
| 2| 3| F| 6|
| 3| 10| M| 13|
+---+---+------+-----+
@udf 糖的语法稍微简单了...
class Phases:
def __init__(self, df1):
self.df1 = df1
#This is the UDF, double-decorated..
@staticmethod
@udf(returnType=IntegerType())
def phases_udf(age):
age += 3
return age
#This is the function that registers the UDF
def doSomething(self):
self.df1 = self.df1.withColumn('AgeP2', self.phases_udf(col('Age')))