Spark dataframe groupBy 和进一步计数聚合

Spark dataframe groupBy and further count aggregations

数据框示例:

col_1, col_2
aaa, 1
aaa, 0
bbb, 1
bbb, 1
bbb, 1 

我想要一个包含 3 列的结果 df:col_1,它的总行数,以及 col_2 === 1 的行数。

我试过了

df.groupBy($"col_1")
   .agg(count($"col_2" === 1).as("delayed"), count(lit(1)) as "total").show(100)  

为什么总计算正确,延迟却不正确?

$"col2"===1 的元素数量仍然与 $"col2" 相同,它们只是 truefalse。相反,您想转换为整数并求和。 (当然,如果col2的值一直是1或者0,可以直接求和。)

我认为你必须定义一个 udf 来将布尔值转换为整数:

val toInt = udf((x: Boolean) => if(x) 1 else 0)

然后(我没有为我的专栏命名):

scala> df.groupBy($"_1").agg(sum(toInt($"_2"===1)), count($"_2")).show()
+---+------------------+---------+
| _1|sum(UDF((_2 = 1)))|count(_2)|
+---+------------------+---------+
|aaa|                 1|        2|
|bbb|                 3|        3|
+---+------------------+---------+

问题

当你使用 count($"col_2" === 1).as("delayed")

它引用了 count function

public static Column count(Column e) Aggregate function: returns the number of items in a group. Parameters: e - (undocumented) Returns: (undocumented) Since: 1.3.0

而不是scala count function

def count(p : (A) => Boolean) : Int Count the number of elements in the list which satisfy a predicate. Parameters p - the predicate for which to count Returns the number of elements satisfying the predicate p.

所以 count($"col_2" === 1) 中的条件即 $"col_2" === 1 不被视为 truefalse 但作为列。 因此计数函数只计算列 $"col_2" === 1

希望解释清楚易懂。

解决方案

您应该使用when函数将值更改为1或0并且使用sum函数作为计数.

sum(when($"col_2" === 1, 1).otherwise(0)).as("delayed")

如果 col_2 column 总是 0 或 1 那么你可以使用 sum

sum($"col_2").as("delayed")