Spark SQL 二次过滤和分组

Spark SQL secondary filtering and grouping

问题:我有一个数据集 A {filed1, field2, field3...},我想首先将 A 分组为 field1 ,然后在每个结果组中,我想做一堆子查询,例如,计算具有 field2 == true 的行数,或计算具有 [=15] 的不同 field3 的数量=] 和 field5 == false,等等

我能想到的一些替代方案:我可以编写一个自定义的用户定义的聚合函数,该函数采用计算过滤条件的函数,但这样我必须创建一个每个查询条件的实例。我也看过 countDistinct 函数可以实现一些操作,但我想不通如何使用它来实现 filter-distinct-count 语义。

在 Pig 中,我可以做到:

FOREACH (GROUP A by field1) {
        field_a = FILTER A by field2 == TRUE;
        field_b = FILTER A by field4 == 'some_value' AND field5 == FALSE;
        field_c = DISTINCT field_b.field3;

        GENERATE  FLATTEN(group),
                  COUNT(field_a) as fa,
                  COUNT(field_b) as fb,
                  COUNT(field_c) as fc,

有没有办法在 Spark SQL 中做到这一点?

不包括不同的计数,这可以通过条件的简单求和来解决:

import org.apache.spark.sql.functions.sum

val df = sc.parallelize(Seq(
  (1L, true, "x", "foo", true), (1L, true, "y", "bar", false), 
  (1L, true, "z", "foo", true), (2L, false, "y", "bar", false), 
  (2L, true, "x", "foo", false)
)).toDF("field1", "field2", "field3", "field4", "field5")

val left = df.groupBy($"field1").agg(
  sum($"field2".cast("int")).alias("fa"),
  sum(($"field4" === "foo" && ! $"field5").cast("int")).alias("fb")
)
left.show

// +------+---+---+
// |field1| fa| fb|
// +------+---+---+
// |     1|  3|  0|
// |     2|  1|  1|
// +------+---+---+

不幸的是要棘手得多。 GROUP BY Spark SQL 中的子句。更不用说寻找不同的元素是非常昂贵的。也许您能做的最好的事情就是分别计算不同的计数并简单地连接结果:

val right = df.where($"field4" === "foo" && ! $"field5")
  .select($"field1".alias("field1_"), $"field3")
  .distinct
  .groupBy($"field1_")
  .agg(count("*").alias("fc"))

val joined = left
  .join(right, $"field1" === $"field1_", "leftouter")
  .na.fill(0)

使用 UDAF 计算每个条件的不同值绝对是一种选择,但有效实施将相当棘手。从内部表示转换相当昂贵,使用集合存储实现快速 UDAF 也不便宜。如果你可以接受近似解,你可以在那里使用布隆过滤器。