Return 所有列 + map 函数使用的 UDF 中的更多列
Return all columns + a few more in a UDF used by the map function
我正在使用映射函数生成一个新列,其值取决于数据框中已存在的列的结果。
def computeTechFields(row):
if row.col1!=VALUE_TO_COMPARE:
tech1=0
else:
tech1=1
return (row.col1, row.col2, row.col3, tech1)
delta2rdd = delta.map(computeTechFields)
问题是我的主数据框有超过 150 列,我必须 return 使用 map 函数所以最后我有这样的东西:
return (row.col1, row.col2, row.col3, row.col4, row.col5, row.col6, row.col7, row.col8, row.col9, row.col10, row.col11, row.col12, row.col13, row.col14, row.col15, row.col16, row.col17, row.col18 ..... row.col149, row.col150, row.col151, tech1)
如你所见,写起来真的很长,很难读。所以我试着做这样的事情:
return (row.*, tech1)
当然没用。
我知道 "withColumn" 函数存在,但我对其性能知之甚少,无论如何也无法使它工作。
编辑(withColumn 函数发生了什么):
def computeTech1(row):
if row.col1!=VALUE_TO_COMPARE:
tech1=0
else:
tech1=1
return tech1
delta2 = delta.withColumn("tech1", computeTech1)
它给了我这个错误:
AssertionError: col should be Column
我试过这样做:
return col(tech1)
还是一样的错误
我也试过了:
delta2 = delta.withColumn("tech1", col(computeTech1))
这一次,错误是:
AttributeError: 'function' object has no attribute '_get_object_id'
编辑结束
所以我的问题是,我如何 return 我的 UDF 中的所有列 + map 函数使用的更多列?
谢谢!
对 Python 不太确定,所以人们可能会纠正我这里的语法,但一般的想法是让你的函数成为一个 UDF,以一列作为输入,然后在 [=11= 中调用它].我在这里使用了 lambda,但经过一些调整,它也应该可以与函数一起使用。
from pyspark.sql.functions import udf
computeTech1UDF = udf(
lambda col: 0 if col != VALUE_TO_COMPARE else 1, IntegerType())
delta2 = delta.withColumn("tech1", computeTech1UDF(col1))
由于您没有为 withColumn
提供列表达式(参见 http://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.withColumn),因此您尝试的方法无效。使用 UDF 包装器可以实现这一点。
我正在使用映射函数生成一个新列,其值取决于数据框中已存在的列的结果。
def computeTechFields(row):
if row.col1!=VALUE_TO_COMPARE:
tech1=0
else:
tech1=1
return (row.col1, row.col2, row.col3, tech1)
delta2rdd = delta.map(computeTechFields)
问题是我的主数据框有超过 150 列,我必须 return 使用 map 函数所以最后我有这样的东西:
return (row.col1, row.col2, row.col3, row.col4, row.col5, row.col6, row.col7, row.col8, row.col9, row.col10, row.col11, row.col12, row.col13, row.col14, row.col15, row.col16, row.col17, row.col18 ..... row.col149, row.col150, row.col151, tech1)
如你所见,写起来真的很长,很难读。所以我试着做这样的事情:
return (row.*, tech1)
当然没用。
我知道 "withColumn" 函数存在,但我对其性能知之甚少,无论如何也无法使它工作。
编辑(withColumn 函数发生了什么):
def computeTech1(row):
if row.col1!=VALUE_TO_COMPARE:
tech1=0
else:
tech1=1
return tech1
delta2 = delta.withColumn("tech1", computeTech1)
它给了我这个错误:
AssertionError: col should be Column
我试过这样做:
return col(tech1)
还是一样的错误
我也试过了:
delta2 = delta.withColumn("tech1", col(computeTech1))
这一次,错误是:
AttributeError: 'function' object has no attribute '_get_object_id'
编辑结束
所以我的问题是,我如何 return 我的 UDF 中的所有列 + map 函数使用的更多列?
谢谢!
对 Python 不太确定,所以人们可能会纠正我这里的语法,但一般的想法是让你的函数成为一个 UDF,以一列作为输入,然后在 [=11= 中调用它].我在这里使用了 lambda,但经过一些调整,它也应该可以与函数一起使用。
from pyspark.sql.functions import udf
computeTech1UDF = udf(
lambda col: 0 if col != VALUE_TO_COMPARE else 1, IntegerType())
delta2 = delta.withColumn("tech1", computeTech1UDF(col1))
由于您没有为 withColumn
提供列表达式(参见 http://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.withColumn),因此您尝试的方法无效。使用 UDF 包装器可以实现这一点。