如果条件适合 Spark Scala,则将文字值设置为 Window
Set literal value over Window if condition suited Spark Scala
我需要检查 window 的条件:
- 如果IND_DEF列是20,那么我想把这个寄存器所属的window列premium的值改成1.
我的初始数据框如下所示:
+--------+----+-------+-----+-------+
|policyId|name|premium|state|IND_DEF|
+--------+----+-------+-----+-------+
| 1| BK| null| KT| 40|
| 1| AK| -31| null| 30|
| 1| VZ| null| IL| 20|
| 2| VK| 32| LI| 7|
| 2| CK| 25| YNZ| 10|
| 2| CK| 0| null| 5|
| 2| VK| 30| IL| 25|
+--------+----+-------+-----+-------+
我想实现这个:
+--------+----+-------+-----+-------+
|policyId|name|premium|state|IND_DEF|
+--------+----+-------+-----+-------+
| 1| BK| 1| KT| 40|
| 1| AK| 1| null| 30|
| 1| VZ| 1| IL| 20|
| 2| VK| 32| LI| 7|
| 2| CK| 25| YNZ| 10|
| 2| CK| 0| null| 5|
| 2| VK| 30| IL| 25|
+--------+----+-------+-----+-------+
我正在尝试以下代码但不起作用...
val df_946 = Seq [(Int, String, Integer, String, Int)]((1,"VZ",null,"IL",20),(1, "AK", -31,null,30),(1,"BK", null,"KT",40),(2,"CK",0,null,5),(2,"CK",25,"YNZ",10),(2,"VK",30,"IL",25),(2,"VK",32,"LI",7)).toDF("policyId", "name", "premium", "state","IND_DEF").orderBy("policyId")
val winSpec = Window.partitionBy("policyId").orderBy("policyId")
val df_947 = df_946.withColumn("premium",when(col("IND_DEF") === 20,lit(1).over(winSpec)).otherwise(col("premium")))
您可以通过 collect_list
为每个 window 分区生成一个 IND_DEF
值数组,并根据 array_contains
条件重新创建列 premium
:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
(1, None, 40),
(1, Some(-31), 30),
(1, None, 20),
(2, Some(32), 7),
(2, Some(30), 10)
).toDF("policyId", "premium", "IND_DEF")
val win = Window.partitionBy($"policyId")
df.
withColumn("indList", collect_list($"IND_DEF").over(win)).
withColumn("premium", when(array_contains($"indList", 20), 1).otherwise($"premium")).
drop($"indList").
show
// +--------+-------+-------+
// |policyId|premium|IND_DEF|
// +--------+-------+-------+
// | 1| 1| 40|
// | 1| 1| 30|
// | 1| 1| 20|
// | 2| 32| 7|
// | 2| 30| 10|
// +--------+-------+-------+
我需要检查 window 的条件: - 如果IND_DEF列是20,那么我想把这个寄存器所属的window列premium的值改成1.
我的初始数据框如下所示:
+--------+----+-------+-----+-------+
|policyId|name|premium|state|IND_DEF|
+--------+----+-------+-----+-------+
| 1| BK| null| KT| 40|
| 1| AK| -31| null| 30|
| 1| VZ| null| IL| 20|
| 2| VK| 32| LI| 7|
| 2| CK| 25| YNZ| 10|
| 2| CK| 0| null| 5|
| 2| VK| 30| IL| 25|
+--------+----+-------+-----+-------+
我想实现这个:
+--------+----+-------+-----+-------+
|policyId|name|premium|state|IND_DEF|
+--------+----+-------+-----+-------+
| 1| BK| 1| KT| 40|
| 1| AK| 1| null| 30|
| 1| VZ| 1| IL| 20|
| 2| VK| 32| LI| 7|
| 2| CK| 25| YNZ| 10|
| 2| CK| 0| null| 5|
| 2| VK| 30| IL| 25|
+--------+----+-------+-----+-------+
我正在尝试以下代码但不起作用...
val df_946 = Seq [(Int, String, Integer, String, Int)]((1,"VZ",null,"IL",20),(1, "AK", -31,null,30),(1,"BK", null,"KT",40),(2,"CK",0,null,5),(2,"CK",25,"YNZ",10),(2,"VK",30,"IL",25),(2,"VK",32,"LI",7)).toDF("policyId", "name", "premium", "state","IND_DEF").orderBy("policyId")
val winSpec = Window.partitionBy("policyId").orderBy("policyId")
val df_947 = df_946.withColumn("premium",when(col("IND_DEF") === 20,lit(1).over(winSpec)).otherwise(col("premium")))
您可以通过 collect_list
为每个 window 分区生成一个 IND_DEF
值数组,并根据 array_contains
条件重新创建列 premium
:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
(1, None, 40),
(1, Some(-31), 30),
(1, None, 20),
(2, Some(32), 7),
(2, Some(30), 10)
).toDF("policyId", "premium", "IND_DEF")
val win = Window.partitionBy($"policyId")
df.
withColumn("indList", collect_list($"IND_DEF").over(win)).
withColumn("premium", when(array_contains($"indList", 20), 1).otherwise($"premium")).
drop($"indList").
show
// +--------+-------+-------+
// |policyId|premium|IND_DEF|
// +--------+-------+-------+
// | 1| 1| 40|
// | 1| 1| 30|
// | 1| 1| 20|
// | 2| 32| 7|
// | 2| 30| 10|
// +--------+-------+-------+