class 内的 Pyspark User-Defined_functions

Pyspark User-Defined_functions inside of a class

我正在尝试在 python class 中创建一个 Spark-UDF。意思是,class 中的方法之一是 UDF。我收到一个名为的错误 “ PicklingError:无法序列化对象:TypeError:无法腌制 _MovedItems 对象”

环境:Azure Databricks。 (DBR 6.1 测试版) 代码执行:在内置笔记本中。 Python 版本:3.5 星火版本:2.4.4

我尝试在 class 之外的单独单元格中定义 UDF,并且 UDF 有效。我不想写那样的代码,我需要遵循 OOP 原则并希望保持结构化。 我在 Google 上尝试了所有方法,但没有帮助。事实上,我什至没有得到关于我得到的错误的信息。 “ PicklingError:无法序列化对象:TypeError:无法腌制 _MovedItems 对象”

class phases():
  def __init__(self, each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg):
    print("Inside the constructor of Class phases ")

    #I need the below 2 variables to be used in my UDF, so i am trying to put 
    them in a class
    self.each_mp_pair_phases_df = each_mp_pair_df_as_arg
    self.unique_mp_pair_phases_df = unique_mp_pair_df_as_arg

  #This is the UDF. 
  def phases_commence(self,each_row):
    print(a)
    return 1

  #This is the function that registers the UDF, 
  def initiate_the_phases_on_the_major_track_segment(self):
    print("Inside the 'initiate_the_phases_on_the_major_track_segment()'")

    #registering the UDF
    self.phases_udf = udf(self.phases_commence,LongType())
    new_df = self.each_mp_pair_phases_df.withColumn("status", self.phases_udf((struct([self.each_mp_pair_phases_df[x] for x in self.each_mp_pair_phases_df.columns]))))
    display(new_df)
#This is a method in a different notebook that creates an object for the above shown class and calls the methods that registers the UDF.
def getting_ready_for_the_phases(each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg):

  phase_obj = phases(each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg)
  phase_obj.initiate_the_phases_on_the_major_track_segment()

错误信息是: PicklingError:无法序列化对象:TypeError:无法腌制 _MovedItems 对象

您的函数需要是静态的才能将其定义为 udf。我正在寻找一些文档来提供一个很好的解释,但找不到它。

基本上(可能不是 100% 准确;欢迎更正)当您定义一个 udf 时,它会自动被 pickle 并复制到每个执行程序,但您不能 pickle a single method of a class which is not defined at the top level (the class is part of the top level but not its methods). Have a look at this post 除了静态方法之外的变通方法。

import pyspark.sql.functions as F
import pyspark.sql.types as T


class Phases():
  def __init__(self, df1):
    print("Inside the constructor of Class phases ")

    self.df1 = df1
    self.phases_udf = F.udf(Phases.phases_commence,T.IntegerType())

  #This is the UDF. 
  @staticmethod
  def phases_commence(age):
    age = age +3
    return age

  #This is the function that registers the UDF, 
  def doSomething(self):
    print("Inside the doSomething")
    self.df1 = self.df1.withColumn('AgeP2', self.phases_udf(F.col('Age')))

l =[(1,   10   ,  'F')
,(2 ,   2   ,  'M')
,(2 ,  10  ,   'F')
,(2 ,  3  ,    'F')
,(3 ,  10,     'M')]

columns = ['id',  'Age',  'Gender']

df=spark.createDataFrame(l, columns)

bla = Phases(df)
bla.doSomething()
bla.df1.show()

输出:

Inside the constructor of Class phases 
Inside the 'initiate_the_phases_on_the_major_track_segment()' 
+---+---+------+-----+ 
| id|Age|Gender|AgeP2| 
+---+---+------+-----+ 
|  1| 10|     F|   13| 
|  2|  2|     M|    5| 
|  2| 10|     F|   13| 
|  2|  3|     F|    6| 
|  3| 10|     M|   13| 
+---+---+------+-----+

@udf 糖的语法稍微简单了...

class Phases:
  def __init__(self, df1):
    self.df1 = df1

  #This is the UDF, double-decorated.. 
  @staticmethod
  @udf(returnType=IntegerType())
  def phases_udf(age):
    age += 3
    return age

  #This is the function that registers the UDF 
  def doSomething(self):
    self.df1 = self.df1.withColumn('AgeP2', self.phases_udf(col('Age')))