具有动态数据类型的 UDF
UDF with Dynamic Data Type
我正在尝试编写可以从 Map 中删除几个键的 udf。但是Map的key和value的类型是不固定的,可以是String或者Array什么的。我应该如何定义这样的udf。我正在使用 Spark 版本 2.4.4。
下面是我的 Map[String, string] 的 udf:
val mapKeys = //Seq[String]
val mapFilterUdf = udf[Map[String, String], Map[String, String]] {
map => map.filter{case (key, _) => mapKeys.contains(key)}
}
mapFilterUdf(dataFrame.col("column_name")).as(column.name)
你可以为 udf 做一个通用的工厂方法:
import scala.reflect.runtime.universe._
def filterUdfFactory[T](mapKeys:Seq[T])(implicit tag:TypeTag[T]) = udf((map:Map[T,T]) => map.filter{case (k,v) => mapKeys.contains(k)})
然后用作例如对于字符串:
val mapKeys = Seq("k1")
val tt = typeTag[String]
val filterUdf = filterUdfFactory[String](mapKeys)
val df = Seq(
Map("k1" -> "v1","k2" -> "v2")
).toDF("map")
df.select(filterUdf($"map"))
.show()
给出:
+----------+
| UDF(map)|
+----------+
|[k1 -> v1]|
+----------+
仅当您将列的运行时模式作为 udf
的第二个参数时,您才能在 UDF 中使用 Any
:
val mapKeys : Seq[Any] = Seq("k1")
val df = Seq(
Map("k1" -> "v1","k2" -> "v2")
).toDF("map")
val colSchema = df.select($"map").schema.head.dataType
val filterUdf = udf((map:Map[Any,Any]) => map.filter{case (k:Any,v:Any) => mapKeys.contains(k)},colSchema)
df
.select(filterUdf($"map"))
.show()
给予
+----------+
| UDF(map)|
+----------+
|[k1 -> v1]|
+----------+
这项工作也适用于 Row
,请参阅:
我正在尝试编写可以从 Map 中删除几个键的 udf。但是Map的key和value的类型是不固定的,可以是String或者Array什么的。我应该如何定义这样的udf。我正在使用 Spark 版本 2.4.4。
下面是我的 Map[String, string] 的 udf:
val mapKeys = //Seq[String]
val mapFilterUdf = udf[Map[String, String], Map[String, String]] {
map => map.filter{case (key, _) => mapKeys.contains(key)}
}
mapFilterUdf(dataFrame.col("column_name")).as(column.name)
你可以为 udf 做一个通用的工厂方法:
import scala.reflect.runtime.universe._
def filterUdfFactory[T](mapKeys:Seq[T])(implicit tag:TypeTag[T]) = udf((map:Map[T,T]) => map.filter{case (k,v) => mapKeys.contains(k)})
然后用作例如对于字符串:
val mapKeys = Seq("k1")
val tt = typeTag[String]
val filterUdf = filterUdfFactory[String](mapKeys)
val df = Seq(
Map("k1" -> "v1","k2" -> "v2")
).toDF("map")
df.select(filterUdf($"map"))
.show()
给出:
+----------+
| UDF(map)|
+----------+
|[k1 -> v1]|
+----------+
仅当您将列的运行时模式作为 udf
的第二个参数时,您才能在 UDF 中使用 Any
:
val mapKeys : Seq[Any] = Seq("k1")
val df = Seq(
Map("k1" -> "v1","k2" -> "v2")
).toDF("map")
val colSchema = df.select($"map").schema.head.dataType
val filterUdf = udf((map:Map[Any,Any]) => map.filter{case (k:Any,v:Any) => mapKeys.contains(k)},colSchema)
df
.select(filterUdf($"map"))
.show()
给予
+----------+
| UDF(map)|
+----------+
|[k1 -> v1]|
+----------+
这项工作也适用于 Row
,请参阅: