Spark UDF - 任务不可序列化异常
Spark UDF - Task not serializable exception
我正在尝试使用以下 Scala 代码创建 UDF
lazy val formattedDF = df.withColumn("result_col", validateudf(df("id")))
val validateudf = udf((id: Int) => {
if(id == 1){
"ID IS EQUAL TO 1"
}
else if(id > 1){
validateId(id)
}
else{
"NO VALID RECORDS"
}
})
def validateId(id:Int) : String = {
if (id > 2) {
"ID IS GREATER THAN 2"
}
else {
"VALID RECORDS"
}
}
当我 运行 这段代码时,我收到任务不可序列化异常。
有什么想法吗?谢谢
udf
被视为一个黑盒,需要对传递的列进行序列化和反序列化,因此当您有内置函数的替代方案时,不建议使用 udf
。
用 withColumn
调用 udf
函数没问题,但是您从 udf
函数内部调用了另一个函数 validateId
导致了问题。
我建议您完全不要使用 udf
函数,因为您只需使用 when
内置函数就可以达到要求。
import org.apache.spark.sql.functions._
val formattedDF2 = df.withColumn("result_col", when($"id" === 1, lit("ID IS EQUAL TO 1")).otherwise(when($"id" > 2, lit("ID IS GREATER THAN 2")).otherwise(when($"id" > 1, lit("VALID RECORDS")).otherwise(lit("NO VALID RECORDS")))))
我正在尝试使用以下 Scala 代码创建 UDF
lazy val formattedDF = df.withColumn("result_col", validateudf(df("id")))
val validateudf = udf((id: Int) => {
if(id == 1){
"ID IS EQUAL TO 1"
}
else if(id > 1){
validateId(id)
}
else{
"NO VALID RECORDS"
}
})
def validateId(id:Int) : String = {
if (id > 2) {
"ID IS GREATER THAN 2"
}
else {
"VALID RECORDS"
}
}
当我 运行 这段代码时,我收到任务不可序列化异常。
有什么想法吗?谢谢
udf
被视为一个黑盒,需要对传递的列进行序列化和反序列化,因此当您有内置函数的替代方案时,不建议使用 udf
。
用 withColumn
调用 udf
函数没问题,但是您从 udf
函数内部调用了另一个函数 validateId
导致了问题。
我建议您完全不要使用 udf
函数,因为您只需使用 when
内置函数就可以达到要求。
import org.apache.spark.sql.functions._
val formattedDF2 = df.withColumn("result_col", when($"id" === 1, lit("ID IS EQUAL TO 1")).otherwise(when($"id" > 2, lit("ID IS GREATER THAN 2")).otherwise(when($"id" > 1, lit("VALID RECORDS")).otherwise(lit("NO VALID RECORDS")))))