Spark 中针对存储在 Hive 表中的图形的长线性查询

Long linear queries in Spark against a graph stored in Hive tables

假设我有一个图 G 和以下查询:

     x     y     z     w     q     r    s
(?a)--(?b)--(?c)--(?d)--(?e)--(?f)--(?g)--(?h)

其中 {?a, ?b, ?c, ..., ?h} 是变量,{x, y, z, w , q, r, s} 是圆弧标签。

在存储级别,每个标签都有一个 table,但也有两个标签的组合。 例如,我可能有一个 table x|a|b|,但我也有一个 table xy|a|b|c|。是的,我有多余的 tables.

基于这个设置我有两个问题:

a) 我需要找到 tables 以便它们之间的连接导致最佳执行时间(最小)。 设 {xy zw, q, rs} 为上述示例中的那些 table。

b) 我必须按给定顺序执行连接,因此我需要找到该顺序,例如:(rs ⨝ q) ⨝ (zw ⨝ xy) ( ⨝ 是自然连接)。

假设我知道要使用哪个 table,即我已经解决了 a),我的问题是如何解决第二个问题。 Spark API 允许我在一行中执行所有连接:

val res1 = xy.join(zw, Seq("c")).join(q, Seq("e")).join(rs, Seq("f"))

但我也可以分几行执行:

val tmp1 = xy.join(zw, Seq("c"))

val tmp2 = q.join(rs, Seq("f"))

val res2 = tmp1.join(tmp2, Seq("e"))

res1.count 和 res2.count 的执行时间(几次运行的平均值)在我的实验中是不同的。树的构建方式似乎对执行有影响。

1) 我可以使用哪种策略来构建导致 Spark 中最佳执行时间的树?

2) 如果每棵不同的树似乎导致不同的性能,那么查询优化器的作用是什么。连接排序。它似乎什么也没做,尤其是在我将所有连接都放在一行代码中的情况下:

val res1 = xy.join(zw, Seq("c")).join(q, Seq("e")).join(rs, Seq("f"))

val res3 = rs.join(q, Seq("f")).join(zw, Seq("e")).join(xy, Seq("c"))

在一种情况下,我可以有一个合理的执行时间。在另一个超时。 Catalyst 没有做任何事情吗?

The Spark API allows me to execute all joins in a single line:

but I can also execute it in several lines:

不正确。这个时候没有执行,只有当你执行一个动作的时候。您展示的是在 Scala 中使用创建相同查询计划的高级运算符编写相同计算图的不同方法。

1) Which strategy can I use to build a tree which leads to the optimal execution time in Spark?

这就是所谓的 Catalyst Optimizer(不是您)的目的。您可能想要探索 CostBasedJoinReorder 逻辑优化和 JoinSelection 执行计划策略,它们负责确保连接的最佳性能。

JoinSelection SparkPlanner 使用执行规划策略将 Join 逻辑运算符规划为受支持的连接物理运算符之一。

CostBasedJoinReorder 是基于成本的优化中重新排序连接的逻辑优化。

如果表的大小很重要,请考虑基于成本的优化 (CBO)。你应该看到区别。您必须使用表格(不是任何关系)并为统计信息执行 ANALYZE TABLE COMPUTE STATISTICS 命令。

Isn't Catalyst doing anything?

它应该优化联接。解释查询计划以获取更多详细信息。