Spark Scala:多次查询相同 table
Spark Scala: Querying same table multiple times
我正在尝试从同一个 table (bigTable) 查询多个列以生成一些聚合列(column1_sum、column2_sum、column3_count)。最后,我将所有列连接在一起形成一个 table.
下面的代码
val t1 = bigTable
.filter($"column10" === value1)
.groupBy("key1","key2")
.agg(sum("column1") as "column1_sum")
val t2 = bigTable
.filter($"column11"===1)
.filter($"column10" === value1)
.groupBy("key1","key2")
.agg(sum("column2") as "column2_sum")
val t3 = bigTable
.filter($"column10" === value3)
.groupBy("key1","key2")
.agg(countDistinct("column3") as "column3_count")
tAll
.join(t1,Seq("key1","key2"),"left_outer")
.join(t2,Seq("key1","key2"),"left_outer")
.join(t3,Seq("key1","key2"),"left_outer")
以上代码有问题
bigTable 是一个巨大的 table(它 运行 包含数百万行)。因此,多次查询效率不高。查询 运行.
花费了大量时间
关于如何以更有效的方式实现相同输出的任何想法?有没有办法减少查询bigTable的次数?
非常感谢。
最简单的改进是仅作为单个聚合执行,其中 predicated 被推入 CASE ... WHEN ...
块,并用近似等效的
替换 countDistinct
tAll
.groupBy("key1","key2")
.agg(
sum(
when($"column10" === "value1", $"column1")
).as("column1_sum"),
sum(
when($"column10" === "value1" and $"column11" === 1, $"column2")
).as("column2_sum"),
approx_count_distinct(
when($"column10" === "value3", $"column3")
).as("column3_count"))
.join(tAll, Seq("key1", "key2"), "right_outer"))
根据所使用的函数和有关数据分布的先验知识,您还可以尝试用具有类似 CASE ... WHEN ...
逻辑的 window 函数替换聚合
import org.apache.spark.sql.expressions.Window
val w = Window
.partitionBy("key1", "key2")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
tAll
.withColumn(
"column1_sum",
sum(when($"column10" === "value1", $"column1")).over(w))
...
但这通常是一种不太稳定的方法。
您还应该考虑使用分组列进行分桶 bigTable
:
val n: Int = ??? // Number of buckets
bigTable.write.bucketBy(n, "key1", "key2").saveAsTable("big_table_clustered")
val bigTableClustered = spark.table("big_table_clustered")
我的代码的主要改进之一是查询 bigTable 一次,而不是问题中提到的多次。
我正在尝试的一段代码(我的代码类似,这只是一个例子):
bigTable
.filter($"column10" === value1)
.groupBy("key1", "key2")
.agg(
sum("column1") as "column1_sum",
sum("column2") as "column2_sum",
countDistinct(when($"column11"===1, col("column3"))) as "column3_count"
)
我正在尝试从同一个 table (bigTable) 查询多个列以生成一些聚合列(column1_sum、column2_sum、column3_count)。最后,我将所有列连接在一起形成一个 table.
下面的代码
val t1 = bigTable
.filter($"column10" === value1)
.groupBy("key1","key2")
.agg(sum("column1") as "column1_sum")
val t2 = bigTable
.filter($"column11"===1)
.filter($"column10" === value1)
.groupBy("key1","key2")
.agg(sum("column2") as "column2_sum")
val t3 = bigTable
.filter($"column10" === value3)
.groupBy("key1","key2")
.agg(countDistinct("column3") as "column3_count")
tAll
.join(t1,Seq("key1","key2"),"left_outer")
.join(t2,Seq("key1","key2"),"left_outer")
.join(t3,Seq("key1","key2"),"left_outer")
以上代码有问题
bigTable 是一个巨大的 table(它 运行 包含数百万行)。因此,多次查询效率不高。查询 运行.
花费了大量时间关于如何以更有效的方式实现相同输出的任何想法?有没有办法减少查询bigTable的次数?
非常感谢。
最简单的改进是仅作为单个聚合执行,其中 predicated 被推入 CASE ... WHEN ...
块,并用近似等效的
countDistinct
tAll
.groupBy("key1","key2")
.agg(
sum(
when($"column10" === "value1", $"column1")
).as("column1_sum"),
sum(
when($"column10" === "value1" and $"column11" === 1, $"column2")
).as("column2_sum"),
approx_count_distinct(
when($"column10" === "value3", $"column3")
).as("column3_count"))
.join(tAll, Seq("key1", "key2"), "right_outer"))
根据所使用的函数和有关数据分布的先验知识,您还可以尝试用具有类似 CASE ... WHEN ...
逻辑的 window 函数替换聚合
import org.apache.spark.sql.expressions.Window
val w = Window
.partitionBy("key1", "key2")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
tAll
.withColumn(
"column1_sum",
sum(when($"column10" === "value1", $"column1")).over(w))
...
但这通常是一种不太稳定的方法。
您还应该考虑使用分组列进行分桶 bigTable
:
val n: Int = ??? // Number of buckets
bigTable.write.bucketBy(n, "key1", "key2").saveAsTable("big_table_clustered")
val bigTableClustered = spark.table("big_table_clustered")
我的代码的主要改进之一是查询 bigTable 一次,而不是问题中提到的多次。
我正在尝试的一段代码(我的代码类似,这只是一个例子):
bigTable
.filter($"column10" === value1)
.groupBy("key1", "key2")
.agg(
sum("column1") as "column1_sum",
sum("column2") as "column2_sum",
countDistinct(when($"column11"===1, col("column3"))) as "column3_count"
)