Spark SQL 二次过滤和分组
Spark SQL secondary filtering and grouping
问题:我有一个数据集 A {filed1, field2, field3...},我想首先将 A 分组为 field1
,然后在每个结果组中,我想做一堆子查询,例如,计算具有 field2 == true
的行数,或计算具有 [=15] 的不同 field3
的数量=] 和 field5 == false
,等等
我能想到的一些替代方案:我可以编写一个自定义的用户定义的聚合函数,该函数采用计算过滤条件的函数,但这样我必须创建一个每个查询条件的实例。我也看过 countDistinct
函数可以实现一些操作,但我想不通如何使用它来实现 filter-distinct-count 语义。
在 Pig 中,我可以做到:
FOREACH (GROUP A by field1) {
field_a = FILTER A by field2 == TRUE;
field_b = FILTER A by field4 == 'some_value' AND field5 == FALSE;
field_c = DISTINCT field_b.field3;
GENERATE FLATTEN(group),
COUNT(field_a) as fa,
COUNT(field_b) as fb,
COUNT(field_c) as fc,
有没有办法在 Spark SQL 中做到这一点?
不包括不同的计数,这可以通过条件的简单求和来解决:
import org.apache.spark.sql.functions.sum
val df = sc.parallelize(Seq(
(1L, true, "x", "foo", true), (1L, true, "y", "bar", false),
(1L, true, "z", "foo", true), (2L, false, "y", "bar", false),
(2L, true, "x", "foo", false)
)).toDF("field1", "field2", "field3", "field4", "field5")
val left = df.groupBy($"field1").agg(
sum($"field2".cast("int")).alias("fa"),
sum(($"field4" === "foo" && ! $"field5").cast("int")).alias("fb")
)
left.show
// +------+---+---+
// |field1| fa| fb|
// +------+---+---+
// | 1| 3| 0|
// | 2| 1| 1|
// +------+---+---+
不幸的是要棘手得多。 GROUP BY
Spark SQL 中的子句。更不用说寻找不同的元素是非常昂贵的。也许您能做的最好的事情就是分别计算不同的计数并简单地连接结果:
val right = df.where($"field4" === "foo" && ! $"field5")
.select($"field1".alias("field1_"), $"field3")
.distinct
.groupBy($"field1_")
.agg(count("*").alias("fc"))
val joined = left
.join(right, $"field1" === $"field1_", "leftouter")
.na.fill(0)
使用 UDAF 计算每个条件的不同值绝对是一种选择,但有效实施将相当棘手。从内部表示转换相当昂贵,使用集合存储实现快速 UDAF 也不便宜。如果你可以接受近似解,你可以在那里使用布隆过滤器。
问题:我有一个数据集 A {filed1, field2, field3...},我想首先将 A 分组为 field1
,然后在每个结果组中,我想做一堆子查询,例如,计算具有 field2 == true
的行数,或计算具有 [=15] 的不同 field3
的数量=] 和 field5 == false
,等等
我能想到的一些替代方案:我可以编写一个自定义的用户定义的聚合函数,该函数采用计算过滤条件的函数,但这样我必须创建一个每个查询条件的实例。我也看过 countDistinct
函数可以实现一些操作,但我想不通如何使用它来实现 filter-distinct-count 语义。
在 Pig 中,我可以做到:
FOREACH (GROUP A by field1) {
field_a = FILTER A by field2 == TRUE;
field_b = FILTER A by field4 == 'some_value' AND field5 == FALSE;
field_c = DISTINCT field_b.field3;
GENERATE FLATTEN(group),
COUNT(field_a) as fa,
COUNT(field_b) as fb,
COUNT(field_c) as fc,
有没有办法在 Spark SQL 中做到这一点?
不包括不同的计数,这可以通过条件的简单求和来解决:
import org.apache.spark.sql.functions.sum
val df = sc.parallelize(Seq(
(1L, true, "x", "foo", true), (1L, true, "y", "bar", false),
(1L, true, "z", "foo", true), (2L, false, "y", "bar", false),
(2L, true, "x", "foo", false)
)).toDF("field1", "field2", "field3", "field4", "field5")
val left = df.groupBy($"field1").agg(
sum($"field2".cast("int")).alias("fa"),
sum(($"field4" === "foo" && ! $"field5").cast("int")).alias("fb")
)
left.show
// +------+---+---+
// |field1| fa| fb|
// +------+---+---+
// | 1| 3| 0|
// | 2| 1| 1|
// +------+---+---+
不幸的是要棘手得多。 GROUP BY
Spark SQL
val right = df.where($"field4" === "foo" && ! $"field5")
.select($"field1".alias("field1_"), $"field3")
.distinct
.groupBy($"field1_")
.agg(count("*").alias("fc"))
val joined = left
.join(right, $"field1" === $"field1_", "leftouter")
.na.fill(0)
使用 UDAF 计算每个条件的不同值绝对是一种选择,但有效实施将相当棘手。从内部表示转换相当昂贵,使用集合存储实现快速 UDAF 也不便宜。如果你可以接受近似解,你可以在那里使用布隆过滤器。