Spark SQL - Scala - 聚合函数作为参数创建 DF 列
Spark SQL - Scala - Aggregate Function as Parameter to Create DF Column
我正在尝试创建一个函数,我在其中作为主要参数传递:
- 一个数据帧
- 另一个函数(聚合:count、countDistinct、max 等)
我的目标是 return 一个基于所提供函数的具有新列的 DataFrame。
不过我打字有问题。我一直在这里搜索,我发现的大部分内容都指向 UDF,并且需要创建它以便在“withColumn”中应用它。
当我运行这样的事情时:
val DF1 = Seq(
("asd", "1", "search", "otpx"),
("asd", "1", "xpto", "otpx"),
("asd", "2", "xpto", "otpx"),
("asd", "3", "xpto", "otpx"),
("asd", "3", "search", "otpx"),
("asd", "4", "search", "otpx"),
("zxc", "1", "search", "otpx"),
("zxc", "1", "search", "otpx"),
("zxc", "1", "search", "otpx"),
("zxc", "1", "search", "otpx"),
("zxc", "2", "xpto", "otpx"),
("zxc", "3", "xpto", "otpx"),
("zxc", "3", "xpto", "otpx"),
("zxc", "3", "xpto", "otpx"),
("qwe", "1", "xpto", "otpx"),
("qwe", "2", "xpto", "otpx"),
("qwe", "3", "xpto", "otpx"),
("qwe", "4", "xpto", "otpx"),
("qwe", "5", "xpto", "otpx")
).toDF("cid", "cts", "type", "subtype")
DF1.show(100)
val canList = List("cid", "cts")
def test[T](df: DataFrame, fn: Column => T, newColName: String, colToFn: String, partitionByColumns: List[String]): DataFrame = {
val window = Window.partitionBy(partitionByColumns.head, partitionByColumns.tail:_*)
val fun: (Column => T) = (arg: Column) => fn(arg) // or right away udfFun = udf(fn)
val udfFun = udf(fun)
val ret = df.withColumn(newColName, udfFun(df(colToFn)).over(window))
ret
}
val DF2 = test(DF1, countDistinct, "count_type", "type", canList)
DF2.orderBy(canList.head, canList.tail:_*).show(100)
我收到如下错误:
No TypeTag available for T
val udfFun = udf(fun)
我在这里错过了什么?
提前致谢,干杯!
首先请注意 countDistinct
不支持 Window。如果你想定义一个函数,在 window 上接受其他聚合函数(比如 count
),你可以将 fn
定义为一个接受列和 returns 的函数柱子。 UDF 在这里不合适,因为您调用的是 Spark SQL 函数,而不是自定义 Scala 函数。
def test(df: DataFrame,
fn: Column => Column,
newColName: String,
colToFn: String,
partitionByColumns: List[String]
): DataFrame = {
val window = Window.partitionBy(partitionByColumns.head, partitionByColumns.tail:_*)
val ret = df.withColumn(newColName, fn(col(colToFn)).over(window))
ret
}
// calling the function
test(DF1, count, "count_type", "type", canList)
我正在尝试创建一个函数,我在其中作为主要参数传递:
- 一个数据帧
- 另一个函数(聚合:count、countDistinct、max 等)
我的目标是 return 一个基于所提供函数的具有新列的 DataFrame。
不过我打字有问题。我一直在这里搜索,我发现的大部分内容都指向 UDF,并且需要创建它以便在“withColumn”中应用它。
当我运行这样的事情时:
val DF1 = Seq(
("asd", "1", "search", "otpx"),
("asd", "1", "xpto", "otpx"),
("asd", "2", "xpto", "otpx"),
("asd", "3", "xpto", "otpx"),
("asd", "3", "search", "otpx"),
("asd", "4", "search", "otpx"),
("zxc", "1", "search", "otpx"),
("zxc", "1", "search", "otpx"),
("zxc", "1", "search", "otpx"),
("zxc", "1", "search", "otpx"),
("zxc", "2", "xpto", "otpx"),
("zxc", "3", "xpto", "otpx"),
("zxc", "3", "xpto", "otpx"),
("zxc", "3", "xpto", "otpx"),
("qwe", "1", "xpto", "otpx"),
("qwe", "2", "xpto", "otpx"),
("qwe", "3", "xpto", "otpx"),
("qwe", "4", "xpto", "otpx"),
("qwe", "5", "xpto", "otpx")
).toDF("cid", "cts", "type", "subtype")
DF1.show(100)
val canList = List("cid", "cts")
def test[T](df: DataFrame, fn: Column => T, newColName: String, colToFn: String, partitionByColumns: List[String]): DataFrame = {
val window = Window.partitionBy(partitionByColumns.head, partitionByColumns.tail:_*)
val fun: (Column => T) = (arg: Column) => fn(arg) // or right away udfFun = udf(fn)
val udfFun = udf(fun)
val ret = df.withColumn(newColName, udfFun(df(colToFn)).over(window))
ret
}
val DF2 = test(DF1, countDistinct, "count_type", "type", canList)
DF2.orderBy(canList.head, canList.tail:_*).show(100)
我收到如下错误:
No TypeTag available for T
val udfFun = udf(fun)
我在这里错过了什么?
提前致谢,干杯!
首先请注意 countDistinct
不支持 Window。如果你想定义一个函数,在 window 上接受其他聚合函数(比如 count
),你可以将 fn
定义为一个接受列和 returns 的函数柱子。 UDF 在这里不合适,因为您调用的是 Spark SQL 函数,而不是自定义 Scala 函数。
def test(df: DataFrame,
fn: Column => Column,
newColName: String,
colToFn: String,
partitionByColumns: List[String]
): DataFrame = {
val window = Window.partitionBy(partitionByColumns.head, partitionByColumns.tail:_*)
val ret = df.withColumn(newColName, fn(col(colToFn)).over(window))
ret
}
// calling the function
test(DF1, count, "count_type", "type", canList)