跳过当前行 COUNT 并使用 Spark Dataframe 总结当前键的其他 COUNTS

Skip the current row COUNT and sum up the other COUNTS for current key with Spark Dataframe

我的输入:

 val df = sc.parallelize(Seq(
  ("0","car1", "success"),
  ("0","car1", "success"),
  ("0","car3", "success"),
  ("0","car2", "success"),
  ("1","car1", "success"),
  ("1","car2", "success"),
  ("0","car3", "success")
)).toDF("id", "item", "status")

我的中间组输出如下所示:

val df2 = df.groupBy("id", "item").agg(count("item").alias("occurences"))
+---+----+----------+
| id|item|occurences|
+---+----+----------+
|  0|car3|         2|
|  0|car2|         1|
|  0|car1|         2|
|  1|car2|         1|
|  1|car1|         1|
+---+----+----------+

我想要的输出是: 正在计算项目 出现的总和 跳过当前 id 项目的出现值

例如在下面的输出table中,car3出现了id "0" 2次,car 2出现了1次car 1出现了2次

因此,对于 id“0”,其 "car3" 项目的其他出现次数之和将为 car2(1) + car1(2) = 3 的值。
对于相同的 id“0”,其 "car2" 项的其他出现次数之和将为 car3(2) + car1(2) = 4 的值。

其余部分继续如此。示例输出

+---+----+----------+----------------------+
| id|item|occurences| other_occurences_sum |
+---+----+----------+----------------------+
|  0|car3|         2|          3           |<- (car2+car1) for id 0
|  0|car2|         1|          4           |<- (car3+car1) for id 0  
|  0|car1|         2|          3           |<- (car3+car2) for id 0
|  1|car2|         1|          1           |<- (car1) for id 1
|  1|car1|         1|          1           |<- (car2) for id 1
+---+----+----------+----------------------+

这是 window 函数的完美目标。

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.sum

val w = Window.partitionBy("id")

df2.withColumn(
  "other_occurences_sum", sum($"occurences").over(w) - $"occurences"
).show
// +---+----+----------+--------------------+     
// | id|item|occurences|other_occurences_sum|
// +---+----+----------+--------------------+
// |  0|car3|         2|                   3|
// |  0|car2|         1|                   4|
// |  0|car1|         2|                   3|
// |  1|car2|         1|                   1|
// |  1|car1|         1|                   1|
// +---+----+----------+--------------------+

其中 sum($"occurences").over(w) 是当前 ID 所有出现次数的总和。当然join也是有效的:

df2.join(
  df2.groupBy("id").agg(sum($"occurences") as "total"), Seq("id")
).select(
    $"*", ($"total" - $"occurences") as "other_occurences_sum"
).show

// +---+----+----------+--------------------+
// | id|item|occurences|other_occurences_sum|
// +---+----+----------+--------------------+
// |  0|car3|         2|                   3|
// |  0|car2|         1|                   4|
// |  0|car1|         2|                   3|
// |  1|car2|         1|                   1|
// |  1|car1|         1|                   1|
// +---+----+----------+--------------------+