跳过当前行 COUNT 并使用 Spark Dataframe 总结当前键的其他 COUNTS
Skip the current row COUNT and sum up the other COUNTS for current key with Spark Dataframe
我的输入:
val df = sc.parallelize(Seq(
("0","car1", "success"),
("0","car1", "success"),
("0","car3", "success"),
("0","car2", "success"),
("1","car1", "success"),
("1","car2", "success"),
("0","car3", "success")
)).toDF("id", "item", "status")
我的中间组输出如下所示:
val df2 = df.groupBy("id", "item").agg(count("item").alias("occurences"))
+---+----+----------+
| id|item|occurences|
+---+----+----------+
| 0|car3| 2|
| 0|car2| 1|
| 0|car1| 2|
| 1|car2| 1|
| 1|car1| 1|
+---+----+----------+
我想要的输出是:
正在计算项目 出现的总和 跳过当前 id 项目的出现值 。
例如在下面的输出table中,car3出现了id "0" 2次,car 2出现了1次car 1出现了2次
因此,对于 id“0”,其 "car3" 项目的其他出现次数之和将为 car2(1) + car1(2) = 3 的值。
对于相同的 id“0”,其 "car2" 项的其他出现次数之和将为 car3(2) + car1(2) = 4 的值。
其余部分继续如此。示例输出
+---+----+----------+----------------------+
| id|item|occurences| other_occurences_sum |
+---+----+----------+----------------------+
| 0|car3| 2| 3 |<- (car2+car1) for id 0
| 0|car2| 1| 4 |<- (car3+car1) for id 0
| 0|car1| 2| 3 |<- (car3+car2) for id 0
| 1|car2| 1| 1 |<- (car1) for id 1
| 1|car1| 1| 1 |<- (car2) for id 1
+---+----+----------+----------------------+
这是 window 函数的完美目标。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.sum
val w = Window.partitionBy("id")
df2.withColumn(
"other_occurences_sum", sum($"occurences").over(w) - $"occurences"
).show
// +---+----+----------+--------------------+
// | id|item|occurences|other_occurences_sum|
// +---+----+----------+--------------------+
// | 0|car3| 2| 3|
// | 0|car2| 1| 4|
// | 0|car1| 2| 3|
// | 1|car2| 1| 1|
// | 1|car1| 1| 1|
// +---+----+----------+--------------------+
其中 sum($"occurences").over(w)
是当前 ID 所有出现次数的总和。当然join
也是有效的:
df2.join(
df2.groupBy("id").agg(sum($"occurences") as "total"), Seq("id")
).select(
$"*", ($"total" - $"occurences") as "other_occurences_sum"
).show
// +---+----+----------+--------------------+
// | id|item|occurences|other_occurences_sum|
// +---+----+----------+--------------------+
// | 0|car3| 2| 3|
// | 0|car2| 1| 4|
// | 0|car1| 2| 3|
// | 1|car2| 1| 1|
// | 1|car1| 1| 1|
// +---+----+----------+--------------------+
我的输入:
val df = sc.parallelize(Seq(
("0","car1", "success"),
("0","car1", "success"),
("0","car3", "success"),
("0","car2", "success"),
("1","car1", "success"),
("1","car2", "success"),
("0","car3", "success")
)).toDF("id", "item", "status")
我的中间组输出如下所示:
val df2 = df.groupBy("id", "item").agg(count("item").alias("occurences"))
+---+----+----------+
| id|item|occurences|
+---+----+----------+
| 0|car3| 2|
| 0|car2| 1|
| 0|car1| 2|
| 1|car2| 1|
| 1|car1| 1|
+---+----+----------+
我想要的输出是: 正在计算项目 出现的总和 跳过当前 id 项目的出现值 。
例如在下面的输出table中,car3出现了id "0" 2次,car 2出现了1次car 1出现了2次
因此,对于 id“0”,其 "car3" 项目的其他出现次数之和将为 car2(1) + car1(2) = 3 的值。
对于相同的 id“0”,其 "car2" 项的其他出现次数之和将为 car3(2) + car1(2) = 4 的值。
其余部分继续如此。示例输出
+---+----+----------+----------------------+
| id|item|occurences| other_occurences_sum |
+---+----+----------+----------------------+
| 0|car3| 2| 3 |<- (car2+car1) for id 0
| 0|car2| 1| 4 |<- (car3+car1) for id 0
| 0|car1| 2| 3 |<- (car3+car2) for id 0
| 1|car2| 1| 1 |<- (car1) for id 1
| 1|car1| 1| 1 |<- (car2) for id 1
+---+----+----------+----------------------+
这是 window 函数的完美目标。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.sum
val w = Window.partitionBy("id")
df2.withColumn(
"other_occurences_sum", sum($"occurences").over(w) - $"occurences"
).show
// +---+----+----------+--------------------+
// | id|item|occurences|other_occurences_sum|
// +---+----+----------+--------------------+
// | 0|car3| 2| 3|
// | 0|car2| 1| 4|
// | 0|car1| 2| 3|
// | 1|car2| 1| 1|
// | 1|car1| 1| 1|
// +---+----+----------+--------------------+
其中 sum($"occurences").over(w)
是当前 ID 所有出现次数的总和。当然join
也是有效的:
df2.join(
df2.groupBy("id").agg(sum($"occurences") as "total"), Seq("id")
).select(
$"*", ($"total" - $"occurences") as "other_occurences_sum"
).show
// +---+----+----------+--------------------+
// | id|item|occurences|other_occurences_sum|
// +---+----+----------+--------------------+
// | 0|car3| 2| 3|
// | 0|car2| 1| 4|
// | 0|car1| 2| 3|
// | 1|car2| 1| 1|
// | 1|car1| 1| 1|
// +---+----+----------+--------------------+