Apache Spark:广播连接行为:连接表和临时表的过滤
Apache Spark: broadcast join behaviour: filtering of joined tables and temp tables
我需要在 spark 中加入 2 tables。
但是我没有完全加入 2 tables,而是先过滤掉第二个 table:
的一部分
spark.sql("select * from a join b on a.key=b.key where b.value='xxx' ")
我想在这种情况下使用广播连接。
Spark 有一个参数定义最大 table 广播连接大小:spark.sql.autoBroadcastJoinThreshold
:
Configures the maximum size in bytes for a table that will be
broadcast to all worker nodes when performing a join. By setting this
value to -1 broadcasting can be disabled. Note that currently
statistics are only supported for Hive Metastore tables where the
command ANALYZE TABLE COMPUTE STATISTICS noscan has been
run. http://spark.apache.org/docs/2.4.0/sql-performance-tuning.html
关于此设置我有以下问题:
- 哪个 table 大小的 spark 将与 autoBroadcastJoinThreshold 的值进行比较:完整大小,或应用 where 子句后的大小?
- 我假设 spark 将在广播之前应用 where 子句,对吗?
- 文档说我需要预先 运行 Hive 的分析 Table 命令。当我将临时视图用作 table 时,它将如何工作?据我了解,我无法针对通过 dataFrame.createorReplaceTempView("b") 创建的 spark 临时视图 运行 分析 Table 命令。我可以播放临时视图内容吗?
对选项 2 的理解是正确的。
您无法在 spark 中分析 TEMP table 。阅读 here
如果您想带头并指定要广播的数据帧,而不是由 spark 决定,可以使用下面的代码片段-
df = df1.join(F.broadcast(df2),df1.some_col == df2.some_col, "left")
我继续做了一些小实验来回答你的第一个问题。
问题 1:
- 创建了一个包含 3 行的数据框
a
[key,df_a_column]
- 创建了一个包含 10 行 [key,value] 的数据框
b
- 运行:
spark.sql("SELECT * FROM a JOIN b ON a.key = b.key").explain()
== Physical Plan ==
*(1) BroadcastHashJoin [key#122], [key#111], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#168]
: +- LocalTableScan [key#122, df_a_column#123]
+- *(1) LocalTableScan [key#111, value#112]
正如预期的那样,广播了 3 行的 Smaller df a
。
- 运行 :
spark.sql("SELECT * FROM a JOIN b ON a.key = b.key where b.value=\"bat\"").explain()
== Physical Plan ==
*(1) BroadcastHashJoin [key#122], [key#111], Inner, BuildRight, false
:- *(1) LocalTableScan [key#122, df_a_column#123]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#152]
+- LocalTableScan [key#111, value#112]
在这里您可以注意到数据帧 b
已广播!这意味着 spark 评估 size AFTER applying where
以选择要广播的内容。
问题 2 :
是的,你是对的。从之前的输出中可以明显看出它首先适用于哪里。
问题 3 :
不,你不能分析,但你可以通过暗示 spark 来广播 tempView table,即使在 SQL 中也是如此。 ref
示例:spark.sql("SELECT /*+ BROADCAST(b) */ * FROM a JOIN b ON a.key = b.key")
如果你现在看到解释:
== Physical Plan ==
*(1) BroadcastHashJoin [key#122], [key#111], Inner, BuildRight, false
:- *(1) LocalTableScan [key#122, df_a_column#123]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#184]
+- LocalTableScan [key#111, value#112]
现在,如果您看到,数据帧 b
已广播,即使它有 10 行。
问题1,没有提示,a
被广播了。
注意:SQL spark 中的广播提示可用于 2.2
了解物理计划的提示:
- 从
LocalTableScan[ list of columns ]
中找出数据帧
- 正在广播
BroadcastExchange
的子 tree/list 下的数据帧。
我需要在 spark 中加入 2 tables。 但是我没有完全加入 2 tables,而是先过滤掉第二个 table:
的一部分spark.sql("select * from a join b on a.key=b.key where b.value='xxx' ")
我想在这种情况下使用广播连接。
Spark 有一个参数定义最大 table 广播连接大小:spark.sql.autoBroadcastJoinThreshold
:
Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE COMPUTE STATISTICS noscan has been run. http://spark.apache.org/docs/2.4.0/sql-performance-tuning.html
关于此设置我有以下问题:
- 哪个 table 大小的 spark 将与 autoBroadcastJoinThreshold 的值进行比较:完整大小,或应用 where 子句后的大小?
- 我假设 spark 将在广播之前应用 where 子句,对吗?
- 文档说我需要预先 运行 Hive 的分析 Table 命令。当我将临时视图用作 table 时,它将如何工作?据我了解,我无法针对通过 dataFrame.createorReplaceTempView("b") 创建的 spark 临时视图 运行 分析 Table 命令。我可以播放临时视图内容吗?
对选项 2 的理解是正确的。 您无法在 spark 中分析 TEMP table 。阅读 here
如果您想带头并指定要广播的数据帧,而不是由 spark 决定,可以使用下面的代码片段-
df = df1.join(F.broadcast(df2),df1.some_col == df2.some_col, "left")
我继续做了一些小实验来回答你的第一个问题。
问题 1:
- 创建了一个包含 3 行的数据框
a
[key,df_a_column] - 创建了一个包含 10 行 [key,value] 的数据框
b
- 运行:
spark.sql("SELECT * FROM a JOIN b ON a.key = b.key").explain()
== Physical Plan ==
*(1) BroadcastHashJoin [key#122], [key#111], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#168]
: +- LocalTableScan [key#122, df_a_column#123]
+- *(1) LocalTableScan [key#111, value#112]
正如预期的那样,广播了 3 行的 Smaller df a
。
- 运行 :
spark.sql("SELECT * FROM a JOIN b ON a.key = b.key where b.value=\"bat\"").explain()
== Physical Plan ==
*(1) BroadcastHashJoin [key#122], [key#111], Inner, BuildRight, false
:- *(1) LocalTableScan [key#122, df_a_column#123]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#152]
+- LocalTableScan [key#111, value#112]
在这里您可以注意到数据帧 b
已广播!这意味着 spark 评估 size AFTER applying where
以选择要广播的内容。
问题 2 :
是的,你是对的。从之前的输出中可以明显看出它首先适用于哪里。
问题 3 : 不,你不能分析,但你可以通过暗示 spark 来广播 tempView table,即使在 SQL 中也是如此。 ref
示例:spark.sql("SELECT /*+ BROADCAST(b) */ * FROM a JOIN b ON a.key = b.key")
如果你现在看到解释:
== Physical Plan ==
*(1) BroadcastHashJoin [key#122], [key#111], Inner, BuildRight, false
:- *(1) LocalTableScan [key#122, df_a_column#123]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#184]
+- LocalTableScan [key#111, value#112]
现在,如果您看到,数据帧 b
已广播,即使它有 10 行。
问题1,没有提示,a
被广播了。
注意:SQL spark 中的广播提示可用于 2.2
了解物理计划的提示:
- 从
LocalTableScan[ list of columns ]
中找出数据帧
- 正在广播
BroadcastExchange
的子 tree/list 下的数据帧。