如何将额外参数传递给 Spark SQL 中的 UDF?

How can I pass extra parameters to UDFs in Spark SQL?

我想解析 DataFrame 中的日期列,并且对于每个日期列,日期的分辨率可能会改变(即 2011/01/10 => 2011 /01 如果设置了分辨率"Month")。

我写了下面的代码:

def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
{
  import org.apache.spark.sql.functions._
  val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}
  val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}

  val allColNames = dataframe.columns
  val allCols = allColNames.map(name => dataframe.col(name))

  val mappedCols =
  {
    for(i <- allCols.indices) yield
    {
      schema(i) match
      {
        case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
        case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
        case _ => allCols(i)
      }
    }
  }

  dataframe.select(mappedCols:_*)

}}

然而它不起作用。看来我只能将 Columns 传递给 UDF。我想知道如果我将 DataFrame 转换为 RDD 并在每一行上应用该函数是否会很慢。

有谁知道正确的解决方法吗?谢谢!

只需使用一点柯里化:

def convertDateFunc(resolution: DateResolutionType) = udf((x:String) => 
  SparkDateTimeConverter.convertDate(x, resolution))

并按如下方式使用它:

case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i))

在旁注中,您应该看一下 sql.functions.truncsql.functions.date_format。这些应该至少是工作的一部分,根本不需要使用 UDF。

:

在 Spark 2.2 或更高版本中,您可以使用 typedLit 函数:

import org.apache.spark.sql.functions.typedLit

支持更广泛的文字,如 SeqMap

您可以创建文字 Column 以使用 org.apache.spark.sql.functions

中定义的 lit(...) 函数传递给 udf

例如:

val takeRight = udf((s: String, i: Int) => s.takeRight(i))
df.select(takeRight($"stringCol", lit(1)))