如何在 when 中执行 window 函数?
How to do window function inside a when?
我有一个数据框,我试图在其中对数组列执行 window 函数。
逻辑如下:Group by(或window分区)id
和filtered
列。计算 types
列为空的行的最大分数,否则取该行的分数。当分数不等于组的最大分数时,将 "NA" 添加到列类型。
val data = spark.createDataFrame(Seq(
(1, "shirt for women", Seq("shirt", "women"), 19.1, "ST"),
(1, "shirt for women", Seq("shirt", "women"), 10.1, null),
(1, "shirt for women", Seq("shirt", "women"), 12.1, null),
(0, "shirt group women", Seq("group", "women"), 15.1, null),
(0, "shirt group women", Seq("group", "women"), 12.1, null),
(3, "shirt nmn women", Seq("shirt", "women"), 16.1, "ST"),
(3, "shirt were women", Seq("shirt", "women"), 13.1, "ST")
)).toDF("id", "raw", "filtered", "score", "types")
+---+-----------------+--------------+-----+-----+
|id |raw |filtered |score|types|
+---+-----------------+--------------+-----+-----+
|1 |shirt for women |[shirt, women]|19.1 |ST |
|1 |shirt for women |[shirt, women]|10.1 |null |
|1 |shirt for women |[shirt, women]|12.1 |null |
|0 |shirt group women|[group, women]|15.1 |null |
|0 |shirt group women|[group, women]|12.1 |null |
|3 |shirt nmn women |[shirt, women]|16.1 |ST |
|3 |shirt were women |[shirt, women]|13.1 |ST |
+---+-----------------+--------------+-----+-----+
预期输出:
+---+------------------+--------------+-----+----+
|id |raw |filtered |score|types|
+---+-----------------+--------------+-----+----+
|1 |shirt for women |[shirt, women]|19.1 |ST |
|1 |shirt for women |[shirt, women]|10.1 |NA |
|1 |shirt for women |[shirt, women]|12.1 |null|
|0 |shirt group women[women, group] |15.1 |null|
|0 |shirt group women|[women, group]|12.1 |NA |
|3 |shirt nmn women |[shirt, women]|16.1 |ST |
|3 |shirt were women |[shirt, women]|13.1 |ST |
+---+-----------------+--------------+-----+----+
我试过了:
data.withColumn("max_score",
when(col("types").isNull,
max("score")
.over(Window.partitionBy("id", "filtered")))
.otherwise($"score"))
.withColumn("type_temp",
when(col("score") =!= col("max_score"),
addReasonsUDF(col("type"),
lit("NA")))
.otherwise(col("type")))
.drop("types", "max_score")
.withColumnRenamed("type_temp", "types")
但它不起作用。这给了我:
+---+-----------------+--------------+-----+---------+-----+
|id |raw |filtered |score|max_score|types|
+---+-----------------+--------------+-----+---------+-----+
|1 |shirt for women |[shirt, women]|19.1 |19.1 |ST |
|1 |shirt women |[shirt, women]|10.1 |19.1 |NA |
|1 |shirt of women |[shirt, women]|12.1 |19.1 |NA |
|0 |shirt group women|[group, women]|15.1 |15.1 |null |
|0 |shirt will women |[group, women]|12.1 |15.1 |NA |
|3 |shirt nmn women |[shirt, women]|16.1 |16.1 |ST |
|3 |shirt were women |[shirt, women]|13.1 |13.1 |ST |
+---+-----------------+--------------+-----+---------+-----+
有人可以告诉我我做错了什么吗?
我认为我的 window 功能有问题,当我尝试针对 id
和 raw
进行分区时,它也无法正常工作。所以字符串和数组分区都不起作用。
dataSet.withColumn("max_score",
when(col("types").isNull,
max("score").over(Window.partitionBy("id", "raw")))
.otherwise($"score")).show(false)
+---+-----------------+--------------+-----+-----+---------+
|id |raw |filtered |score|types|max_score|
+---+-----------------+--------------+-----+-----+---------+
|3 |shirt nmn women |[shirt, women]|16.1 |ST |16.1 |
|0 |shirt group women|[group, women]|15.1 |null |15.1 |
|0 |shirt group women|[group, women]|12.1 |null |15.1 |
|3 |shirt were women |[shirt, women]|13.1 |ST |13.1 |
|1 |shirt for women |[shirt, women]|19.1 |ST |19.1 |
|1 |shirt for women |[shirt, women]|10.1 |null |19.1 |
|1 |shirt for women |[shirt, women]|12.1 |null |19.1 |
+---+-----------------+--------------+-----+-----+---------+
您不需要在 when
表达式中包含 window 函数,而是可以分两个阶段完成。首先根据 id
、filtered
和 types
列 ,将最高分数作为新列添加到每个组中。这将专门针对 types
为 null
的组给出最高分数。 window 表达式是首选,因为应保留其他列。
在此之后,只要 types
有一个 null
值且最大分数不等于 score
.
在代码中:
val w = Window.partitionBy("id", "filtered", "types")
val df = data.withColumn("max_score", max($"score").over(w))
.withColumn("types", when($"types".isNull && $"score" =!= $"max_score", "NA").otherwise($"types"))
.drop("max_score")
我有一个数据框,我试图在其中对数组列执行 window 函数。
逻辑如下:Group by(或window分区)id
和filtered
列。计算 types
列为空的行的最大分数,否则取该行的分数。当分数不等于组的最大分数时,将 "NA" 添加到列类型。
val data = spark.createDataFrame(Seq(
(1, "shirt for women", Seq("shirt", "women"), 19.1, "ST"),
(1, "shirt for women", Seq("shirt", "women"), 10.1, null),
(1, "shirt for women", Seq("shirt", "women"), 12.1, null),
(0, "shirt group women", Seq("group", "women"), 15.1, null),
(0, "shirt group women", Seq("group", "women"), 12.1, null),
(3, "shirt nmn women", Seq("shirt", "women"), 16.1, "ST"),
(3, "shirt were women", Seq("shirt", "women"), 13.1, "ST")
)).toDF("id", "raw", "filtered", "score", "types")
+---+-----------------+--------------+-----+-----+
|id |raw |filtered |score|types|
+---+-----------------+--------------+-----+-----+
|1 |shirt for women |[shirt, women]|19.1 |ST |
|1 |shirt for women |[shirt, women]|10.1 |null |
|1 |shirt for women |[shirt, women]|12.1 |null |
|0 |shirt group women|[group, women]|15.1 |null |
|0 |shirt group women|[group, women]|12.1 |null |
|3 |shirt nmn women |[shirt, women]|16.1 |ST |
|3 |shirt were women |[shirt, women]|13.1 |ST |
+---+-----------------+--------------+-----+-----+
预期输出:
+---+------------------+--------------+-----+----+
|id |raw |filtered |score|types|
+---+-----------------+--------------+-----+----+
|1 |shirt for women |[shirt, women]|19.1 |ST |
|1 |shirt for women |[shirt, women]|10.1 |NA |
|1 |shirt for women |[shirt, women]|12.1 |null|
|0 |shirt group women[women, group] |15.1 |null|
|0 |shirt group women|[women, group]|12.1 |NA |
|3 |shirt nmn women |[shirt, women]|16.1 |ST |
|3 |shirt were women |[shirt, women]|13.1 |ST |
+---+-----------------+--------------+-----+----+
我试过了:
data.withColumn("max_score",
when(col("types").isNull,
max("score")
.over(Window.partitionBy("id", "filtered")))
.otherwise($"score"))
.withColumn("type_temp",
when(col("score") =!= col("max_score"),
addReasonsUDF(col("type"),
lit("NA")))
.otherwise(col("type")))
.drop("types", "max_score")
.withColumnRenamed("type_temp", "types")
但它不起作用。这给了我:
+---+-----------------+--------------+-----+---------+-----+
|id |raw |filtered |score|max_score|types|
+---+-----------------+--------------+-----+---------+-----+
|1 |shirt for women |[shirt, women]|19.1 |19.1 |ST |
|1 |shirt women |[shirt, women]|10.1 |19.1 |NA |
|1 |shirt of women |[shirt, women]|12.1 |19.1 |NA |
|0 |shirt group women|[group, women]|15.1 |15.1 |null |
|0 |shirt will women |[group, women]|12.1 |15.1 |NA |
|3 |shirt nmn women |[shirt, women]|16.1 |16.1 |ST |
|3 |shirt were women |[shirt, women]|13.1 |13.1 |ST |
+---+-----------------+--------------+-----+---------+-----+
有人可以告诉我我做错了什么吗?
我认为我的 window 功能有问题,当我尝试针对 id
和 raw
进行分区时,它也无法正常工作。所以字符串和数组分区都不起作用。
dataSet.withColumn("max_score",
when(col("types").isNull,
max("score").over(Window.partitionBy("id", "raw")))
.otherwise($"score")).show(false)
+---+-----------------+--------------+-----+-----+---------+
|id |raw |filtered |score|types|max_score|
+---+-----------------+--------------+-----+-----+---------+
|3 |shirt nmn women |[shirt, women]|16.1 |ST |16.1 |
|0 |shirt group women|[group, women]|15.1 |null |15.1 |
|0 |shirt group women|[group, women]|12.1 |null |15.1 |
|3 |shirt were women |[shirt, women]|13.1 |ST |13.1 |
|1 |shirt for women |[shirt, women]|19.1 |ST |19.1 |
|1 |shirt for women |[shirt, women]|10.1 |null |19.1 |
|1 |shirt for women |[shirt, women]|12.1 |null |19.1 |
+---+-----------------+--------------+-----+-----+---------+
您不需要在 when
表达式中包含 window 函数,而是可以分两个阶段完成。首先根据 id
、filtered
和 types
列 ,将最高分数作为新列添加到每个组中。这将专门针对 types
为 null
的组给出最高分数。 window 表达式是首选,因为应保留其他列。
在此之后,只要 types
有一个 null
值且最大分数不等于 score
.
在代码中:
val w = Window.partitionBy("id", "filtered", "types")
val df = data.withColumn("max_score", max($"score").over(w))
.withColumn("types", when($"types".isNull && $"score" =!= $"max_score", "NA").otherwise($"types"))
.drop("max_score")