如何将函数应用于 PySpark DataFrame 指定列的每一行
How to apply function to each row of specified column of PySpark DataFrame
我有一个由三列组成的 PySpark DataFrame,其结构如下。
In[1]: df.take(1)
Out[1]:
[Row(angle_est=-0.006815859163590619, rwsep_est=0.00019571401752467945, cost_est=34.33651951754235)]
我想要做的是检索第一列 (angle_est
) 的每个值,并将其作为参数 xMisallignment
传递给定义的函数以设置特定的 属性 class 对象。定义的函数是:
def setMisAllignment(self, xMisallignment):
if np.abs(xMisallignment) > 0.8:
warnings.warn('You might set misallignment angle too large.')
self.MisAllignment = xMisallignment
我正在尝试 select 第一列并将其转换为 rdd,并将上述函数应用于 map() 函数,但它似乎不起作用, MisAllignment
做到了无论如何都不会改变。
df.select(df.angle_est).rdd.map(lambda row: model0.setMisAllignment(row))
In[2]: model0.MisAllignment
Out[2]: 0.00111511718224
任何人有想法帮助我让那个功能工作?提前致谢!
您可以将您的函数注册为类似于以下内容的 spark UDF:
spark.udf.register("misallign", setMisAllignment)
您可以在此测试套件中获得许多创建和注册 UDF 的示例:
https://github.com/apache/spark/blob/master/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java
希望它能回答您的问题
我有一个由三列组成的 PySpark DataFrame,其结构如下。
In[1]: df.take(1)
Out[1]:
[Row(angle_est=-0.006815859163590619, rwsep_est=0.00019571401752467945, cost_est=34.33651951754235)]
我想要做的是检索第一列 (angle_est
) 的每个值,并将其作为参数 xMisallignment
传递给定义的函数以设置特定的 属性 class 对象。定义的函数是:
def setMisAllignment(self, xMisallignment):
if np.abs(xMisallignment) > 0.8:
warnings.warn('You might set misallignment angle too large.')
self.MisAllignment = xMisallignment
我正在尝试 select 第一列并将其转换为 rdd,并将上述函数应用于 map() 函数,但它似乎不起作用, MisAllignment
做到了无论如何都不会改变。
df.select(df.angle_est).rdd.map(lambda row: model0.setMisAllignment(row))
In[2]: model0.MisAllignment
Out[2]: 0.00111511718224
任何人有想法帮助我让那个功能工作?提前致谢!
您可以将您的函数注册为类似于以下内容的 spark UDF:
spark.udf.register("misallign", setMisAllignment)
您可以在此测试套件中获得许多创建和注册 UDF 的示例: https://github.com/apache/spark/blob/master/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java
希望它能回答您的问题