如何将额外参数传递给 Spark SQL 中的 UDF?
How can I pass extra parameters to UDFs in Spark SQL?
我想解析 DataFrame
中的日期列,并且对于每个日期列,日期的分辨率可能会改变(即 2011/01/10 => 2011 /01 如果设置了分辨率"Month")。
我写了下面的代码:
def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
{
import org.apache.spark.sql.functions._
val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}
val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}
val allColNames = dataframe.columns
val allCols = allColNames.map(name => dataframe.col(name))
val mappedCols =
{
for(i <- allCols.indices) yield
{
schema(i) match
{
case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
case _ => allCols(i)
}
}
}
dataframe.select(mappedCols:_*)
}}
然而它不起作用。看来我只能将 Column
s 传递给 UDF。我想知道如果我将 DataFrame
转换为 RDD
并在每一行上应用该函数是否会很慢。
有谁知道正确的解决方法吗?谢谢!
只需使用一点柯里化:
def convertDateFunc(resolution: DateResolutionType) = udf((x:String) =>
SparkDateTimeConverter.convertDate(x, resolution))
并按如下方式使用它:
case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i))
在旁注中,您应该看一下 sql.functions.trunc
和 sql.functions.date_format
。这些应该至少是工作的一部分,根本不需要使用 UDF。
注:
在 Spark 2.2 或更高版本中,您可以使用 typedLit
函数:
import org.apache.spark.sql.functions.typedLit
支持更广泛的文字,如 Seq
或 Map
。
您可以创建文字 Column
以使用 org.apache.spark.sql.functions
中定义的 lit(...)
函数传递给 udf
例如:
val takeRight = udf((s: String, i: Int) => s.takeRight(i))
df.select(takeRight($"stringCol", lit(1)))
我想解析 DataFrame
中的日期列,并且对于每个日期列,日期的分辨率可能会改变(即 2011/01/10 => 2011 /01 如果设置了分辨率"Month")。
我写了下面的代码:
def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
{
import org.apache.spark.sql.functions._
val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}
val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}
val allColNames = dataframe.columns
val allCols = allColNames.map(name => dataframe.col(name))
val mappedCols =
{
for(i <- allCols.indices) yield
{
schema(i) match
{
case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
case _ => allCols(i)
}
}
}
dataframe.select(mappedCols:_*)
}}
然而它不起作用。看来我只能将 Column
s 传递给 UDF。我想知道如果我将 DataFrame
转换为 RDD
并在每一行上应用该函数是否会很慢。
有谁知道正确的解决方法吗?谢谢!
只需使用一点柯里化:
def convertDateFunc(resolution: DateResolutionType) = udf((x:String) =>
SparkDateTimeConverter.convertDate(x, resolution))
并按如下方式使用它:
case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i))
在旁注中,您应该看一下 sql.functions.trunc
和 sql.functions.date_format
。这些应该至少是工作的一部分,根本不需要使用 UDF。
注:
在 Spark 2.2 或更高版本中,您可以使用 typedLit
函数:
import org.apache.spark.sql.functions.typedLit
支持更广泛的文字,如 Seq
或 Map
。
您可以创建文字 Column
以使用 org.apache.spark.sql.functions
lit(...)
函数传递给 udf
例如:
val takeRight = udf((s: String, i: Int) => s.takeRight(i))
df.select(takeRight($"stringCol", lit(1)))