Apache Spark:广播连接行为:连接表和临时表的过滤

Apache Spark: broadcast join behaviour: filtering of joined tables and temp tables

我需要在 spark 中加入 2 tables。 但是我没有完全加入 2 tables,而是先过滤掉第二个 table:

的一部分
spark.sql("select * from a join b on a.key=b.key where b.value='xxx' ")

我想在这种情况下使用广播连接。

Spark 有一个参数定义最大 table 广播连接大小:spark.sql.autoBroadcastJoinThreshold:

Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE COMPUTE STATISTICS noscan has been run. http://spark.apache.org/docs/2.4.0/sql-performance-tuning.html

关于此设置我有以下问题:

  1. 哪个 table 大小的 spark 将与 autoBroadcastJoinThreshold 的值进行比较:完整大小,或应用 where 子句后的大小?
  2. 我假设 spark 将在广播之前应用 where 子句,对吗?
  3. 文档说我需要预先 运行 Hive 的分析 Table 命令。当我将临时视图用作 table 时,它将如何工作?据我了解,我无法针对通过 dataFrame.createorReplaceTempView("b") 创建的 spark 临时视图 运行 分析 Table 命令。我可以播放临时视图内容吗?

对选项 2 的理解是正确的。 您无法在 spark 中分析 TEMP table 。阅读 here

如果您想带头并指定要广播的数据帧,而不是由 spark 决定,可以使用下面的代码片段-

df = df1.join(F.broadcast(df2),df1.some_col == df2.some_col, "left")

我继续做了一些小实验来回答你的第一个问题。

问题 1:

  • 创建了一个包含 3 行的数据框 a [key,df_a_column]
  • 创建了一个包含 10 行 [key,value] 的数据框 b
  • 运行: spark.sql("SELECT * FROM a JOIN b ON a.key = b.key").explain()
== Physical Plan ==
*(1) BroadcastHashJoin [key#122], [key#111], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#168]
:  +- LocalTableScan [key#122, df_a_column#123]
+- *(1) LocalTableScan [key#111, value#112]

正如预期的那样,广播了 3 行的 Smaller df a

  • 运行 : spark.sql("SELECT * FROM a JOIN b ON a.key = b.key where b.value=\"bat\"").explain()
== Physical Plan ==
*(1) BroadcastHashJoin [key#122], [key#111], Inner, BuildRight, false
:- *(1) LocalTableScan [key#122, df_a_column#123]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#152]
   +- LocalTableScan [key#111, value#112]

在这里您可以注意到数据帧 b 已广播!这意味着 spark 评估 size AFTER applying where 以选择要广播的内容。

问题 2 :

是的,你是对的。从之前的输出中可以明显看出它首先适用于哪里。

问题 3 : 不,你不能分析,但你可以通过暗示 spark 来广播 tempView table,即使在 SQL 中也是如此。 ref

示例:spark.sql("SELECT /*+ BROADCAST(b) */ * FROM a JOIN b ON a.key = b.key")

如果你现在看到解释:

== Physical Plan ==
*(1) BroadcastHashJoin [key#122], [key#111], Inner, BuildRight, false
:- *(1) LocalTableScan [key#122, df_a_column#123]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#184]
   +- LocalTableScan [key#111, value#112]

现在,如果您看到,数据帧 b 已广播,即使它有 10 行。 问题1,没有提示,a被广播了。

注意:SQL spark 中的广播提示可用于 2.2


了解物理计划的提示:

  • LocalTableScan[ list of columns ]
  • 中找出数据帧
  • 正在广播 BroadcastExchange 的子 tree/list 下的数据帧。