具有动态数据类型的 UDF

UDF with Dynamic Data Type

我正在尝试编写可以从 Map 中删除几个键的 udf。但是Map的key和value的类型是不固定的,可以是String或者Array什么的。我应该如何定义这样的udf。我正在使用 Spark 版本 2.4.4。

下面是我的 Map[String, string] 的 udf:

val mapKeys = //Seq[String]
val mapFilterUdf = udf[Map[String, String], Map[String, String]] {
    map => map.filter{case (key, _) => mapKeys.contains(key)}
}
mapFilterUdf(dataFrame.col("column_name")).as(column.name)

你可以为 udf 做一个通用的工厂方法:

import scala.reflect.runtime.universe._

def filterUdfFactory[T](mapKeys:Seq[T])(implicit tag:TypeTag[T]) = udf((map:Map[T,T]) => map.filter{case (k,v) => mapKeys.contains(k)})

然后用作例如对于字符串:

val mapKeys = Seq("k1")

val tt = typeTag[String]
val filterUdf = filterUdfFactory[String](mapKeys)

 val df = Seq(
    Map("k1" -> "v1","k2" -> "v2")
 ).toDF("map")

 df.select(filterUdf($"map"))
.show()

给出:

+----------+
|  UDF(map)|
+----------+
|[k1 -> v1]|
+----------+

仅当您将列的运行时模式作为 udf 的第二个参数时,您才能在 UDF 中使用 Any:

val mapKeys : Seq[Any] = Seq("k1")

val df = Seq(
    Map("k1" -> "v1","k2" -> "v2")
).toDF("map")

val colSchema = df.select($"map").schema.head.dataType

val filterUdf = udf((map:Map[Any,Any]) => map.filter{case (k:Any,v:Any) => mapKeys.contains(k)},colSchema)

df
.select(filterUdf($"map"))
.show()

给予

+----------+
|  UDF(map)|
+----------+
|[k1 -> v1]|
+----------+

这项工作也适用于 Row,请参阅: