在 Spark 中合并等分区数据帧
Merging equi-partitioned data frames in Spark
在 Hadoop 中,join/merge 大型等分数据集可以在不重新洗牌和减少阶段的情况下完成,只需使用映射端连接和 CompositeInputFormat。
试图弄清楚在 Spark 中做到这一点:
val x = sc.parallelize(Seq(("D", 1), ("C", 2), ("B", 3), ("A", 4))).toDF("k", "v")
.repartition(col("k")).cache()
val y = sc.parallelize(Seq(("F", 5), ("E", 6), ("D", 7), ("C", 8))).toDF("k", "v")
.repartition(col("k")).cache()
val xy = x.join(y, x.col("k") === y.col("k"), "outer")
x.show() y.show() xy.show()
+---+---+ +---+---+ +----+----+----+----+
| k| v| | k| v| | k| v| k| v|
+---+---+ +---+---+ +----+----+----+----+
| A| 6| | C| 12| | A| 4|null|null|
| B| 5| | D| 11| | B| 3|null|null|
| C| 4| | E| 10| | C| 2| C| 8|
| D| 3| | F| 9| | D| 1| D| 7|
| E| 2| | G| 8| |null|null| E| 6|
| F| 1| | H| 7| |null|null| F| 5|
+---+---+ +---+---+ +----+----+----+----+
到目前为止一切顺利。
但是当我检查执行计划时,我看到 "unnecessary" 排序:
xy.explain
== Physical Plan ==
SortMergeOuterJoin [k#1283], [k#1297], FullOuter, None
:- Sort [k#1283 ASC], false, 0
: +- InMemoryColumnarTableScan [k#1283,v#1284], InMemoryRelation [k#1283,v#1284], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1283,200), None, None
+- Sort [k#1297 ASC], false, 0
+- InMemoryColumnarTableScan [k#1297,v#1298], InMemoryRelation [k#1297,v#1298], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1297,200), None, None
这里可以避免排序吗?
编辑
作为参考,Hadoop 自 2007 年以来就提供了这个 "feature":
https://issues.apache.org/jira/browse/HADOOP-2085
更新
正如 Lezzar 所指出的,单独的 repartition() 不足以实现等分区排序状态。
我认为现在需要在它后面加上 sortWithinPartitions()
所以这应该可以解决问题:
val x = sc.parallelize(Seq(("F", 1), ("E", 2), ("D", 3), ("C", 4), ("B", 5), ("A", 6))).toDF("k", "v")
.repartition(col("k")).sortWithinPartitions(col("k")).cache()
val y = sc.parallelize(Seq(("H", 7), ("G", 8), ("F", 9), ("E",10), ("D",11), ("C",12))).toDF("k", "v")
.repartition(col("k")).sortWithinPartitions(col("k")).cache()
xy.explain()
== Physical Plan ==
SortMergeOuterJoin [k#1055], [k#1069], FullOuter, None
:- InMemoryColumnarTableScan [k#1055,v#1056], InMemoryRelation [k#1055,v#1056], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1055 ASC], false, 0, None
+- InMemoryColumnarTableScan [k#1069,v#1070], InMemoryRelation [k#1069,v#1070], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1069 ASC], false, 0, None
不再排序!
类似于 Hadoop 中的 Map-side joining,Spark 具有广播连接,它将 table 数据传输到所有 worker,就像分布式缓存在 hadoop mapreduce 中所做的那样。请参考 spark 文档或搜索一次 spark broadcast hash join。与蜂巢不同,Spark 会自动处理它。所以,不用担心。
尽管如此,您需要了解几个参数。
-> spark.sql.autoBroadcastJoinThreshold,spark 自动广播 table 的大小。
您可以尝试下面的代码来了解广播加入,您也可以参考 spark Documentation for BroadCast join 或 google 它了解更多细节。
要尝试的示例代码:
val sqlContext = new HiveContext(sc);
1) sqlContext.sql("CREATE TABLE IF NOT EXISTS tab3 (key INT, value STRING)")
2) sqlContext.sql("INSERT INTO tab4 select 1,\"srini\" from sr23");
(I have created other table to just insert a record into table. As hive only support insert into select, i have used this trick to have some data.) You can skip this step as well, as you just want to see the physical plan.
------ You can also use any Hive table that is already created instead.. I am just trying to simulate the hive table thats it. ---
3) val srini_df1 = sqlContext.sql("ANALYZE TABLE tab4 COMPUTE STATISTICS NOSCAN");
4) val df2 = sc.parallelize(Seq((5,"F"), (6,"E"), (7,"sri"), (1,"test"))).toDF("key", "value")
5) val join_df = sqlContext.sql("SELECT * FROM tab5").join(df2,"key");
6) join_df.explain
16/03/15 22:40:09 INFO storage.MemoryStore: ensureFreeSpace(530360) called with curMem=238151, maxMem=555755765
16/03/15 22:40:09 INFO storage.MemoryStore: Block broadcast_23 stored as values in memory (estimated size 517.9 KB, free 529.3 MB)
16/03/15 22:40:09 INFO storage.MemoryStore: ensureFreeSpace(42660) called with curMem=768511, maxMem=555755765
16/03/15 22:40:09 INFO storage.MemoryStore: Block broadcast_23_piece0 stored as bytes in memory (estimated size 41.7 KB, free 529.2 MB)
16/03/15 22:40:09 INFO storage.BlockManagerInfo: Added broadcast_23_piece0 in memory on localhost:63721 (size: 41.7 KB, free: 529.9 MB)
16/03/15 22:40:09 INFO spark.SparkContext: Created broadcast 23 from explain at <console>:28
== Physical Plan ==
Project [key#374,value#375,value#370]
BroadcastHashJoin [key#374], [key#369], BuildLeft
HiveTableScan [key#374,value#375], (MetastoreRelation default, tab5, None)
Project [_1#367 AS key#369,_2#368 AS value#370]
Scan PhysicalRDD[_1#367,_2#368]
你为什么说不必要的排序?合并联接需要对数据进行排序。在恕我直言中,没有比合并联接更好的策略来执行完整的外部联接,除非您的数据帧之一小到可以广播
在 Hadoop 中,join/merge 大型等分数据集可以在不重新洗牌和减少阶段的情况下完成,只需使用映射端连接和 CompositeInputFormat。
试图弄清楚在 Spark 中做到这一点:
val x = sc.parallelize(Seq(("D", 1), ("C", 2), ("B", 3), ("A", 4))).toDF("k", "v")
.repartition(col("k")).cache()
val y = sc.parallelize(Seq(("F", 5), ("E", 6), ("D", 7), ("C", 8))).toDF("k", "v")
.repartition(col("k")).cache()
val xy = x.join(y, x.col("k") === y.col("k"), "outer")
x.show() y.show() xy.show()
+---+---+ +---+---+ +----+----+----+----+
| k| v| | k| v| | k| v| k| v|
+---+---+ +---+---+ +----+----+----+----+
| A| 6| | C| 12| | A| 4|null|null|
| B| 5| | D| 11| | B| 3|null|null|
| C| 4| | E| 10| | C| 2| C| 8|
| D| 3| | F| 9| | D| 1| D| 7|
| E| 2| | G| 8| |null|null| E| 6|
| F| 1| | H| 7| |null|null| F| 5|
+---+---+ +---+---+ +----+----+----+----+
到目前为止一切顺利。 但是当我检查执行计划时,我看到 "unnecessary" 排序:
xy.explain
== Physical Plan ==
SortMergeOuterJoin [k#1283], [k#1297], FullOuter, None
:- Sort [k#1283 ASC], false, 0
: +- InMemoryColumnarTableScan [k#1283,v#1284], InMemoryRelation [k#1283,v#1284], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1283,200), None, None
+- Sort [k#1297 ASC], false, 0
+- InMemoryColumnarTableScan [k#1297,v#1298], InMemoryRelation [k#1297,v#1298], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1297,200), None, None
这里可以避免排序吗?
编辑
作为参考,Hadoop 自 2007 年以来就提供了这个 "feature": https://issues.apache.org/jira/browse/HADOOP-2085
更新
正如 Lezzar 所指出的,单独的 repartition() 不足以实现等分区排序状态。 我认为现在需要在它后面加上 sortWithinPartitions() 所以这应该可以解决问题:
val x = sc.parallelize(Seq(("F", 1), ("E", 2), ("D", 3), ("C", 4), ("B", 5), ("A", 6))).toDF("k", "v")
.repartition(col("k")).sortWithinPartitions(col("k")).cache()
val y = sc.parallelize(Seq(("H", 7), ("G", 8), ("F", 9), ("E",10), ("D",11), ("C",12))).toDF("k", "v")
.repartition(col("k")).sortWithinPartitions(col("k")).cache()
xy.explain()
== Physical Plan ==
SortMergeOuterJoin [k#1055], [k#1069], FullOuter, None
:- InMemoryColumnarTableScan [k#1055,v#1056], InMemoryRelation [k#1055,v#1056], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1055 ASC], false, 0, None
+- InMemoryColumnarTableScan [k#1069,v#1070], InMemoryRelation [k#1069,v#1070], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1069 ASC], false, 0, None
不再排序!
类似于 Hadoop 中的 Map-side joining,Spark 具有广播连接,它将 table 数据传输到所有 worker,就像分布式缓存在 hadoop mapreduce 中所做的那样。请参考 spark 文档或搜索一次 spark broadcast hash join。与蜂巢不同,Spark 会自动处理它。所以,不用担心。
尽管如此,您需要了解几个参数。
-> spark.sql.autoBroadcastJoinThreshold,spark 自动广播 table 的大小。
您可以尝试下面的代码来了解广播加入,您也可以参考 spark Documentation for BroadCast join 或 google 它了解更多细节。
要尝试的示例代码:
val sqlContext = new HiveContext(sc);
1) sqlContext.sql("CREATE TABLE IF NOT EXISTS tab3 (key INT, value STRING)")
2) sqlContext.sql("INSERT INTO tab4 select 1,\"srini\" from sr23");
(I have created other table to just insert a record into table. As hive only support insert into select, i have used this trick to have some data.) You can skip this step as well, as you just want to see the physical plan.
------ You can also use any Hive table that is already created instead.. I am just trying to simulate the hive table thats it. ---
3) val srini_df1 = sqlContext.sql("ANALYZE TABLE tab4 COMPUTE STATISTICS NOSCAN");
4) val df2 = sc.parallelize(Seq((5,"F"), (6,"E"), (7,"sri"), (1,"test"))).toDF("key", "value")
5) val join_df = sqlContext.sql("SELECT * FROM tab5").join(df2,"key");
6) join_df.explain
16/03/15 22:40:09 INFO storage.MemoryStore: ensureFreeSpace(530360) called with curMem=238151, maxMem=555755765
16/03/15 22:40:09 INFO storage.MemoryStore: Block broadcast_23 stored as values in memory (estimated size 517.9 KB, free 529.3 MB)
16/03/15 22:40:09 INFO storage.MemoryStore: ensureFreeSpace(42660) called with curMem=768511, maxMem=555755765
16/03/15 22:40:09 INFO storage.MemoryStore: Block broadcast_23_piece0 stored as bytes in memory (estimated size 41.7 KB, free 529.2 MB)
16/03/15 22:40:09 INFO storage.BlockManagerInfo: Added broadcast_23_piece0 in memory on localhost:63721 (size: 41.7 KB, free: 529.9 MB)
16/03/15 22:40:09 INFO spark.SparkContext: Created broadcast 23 from explain at <console>:28
== Physical Plan ==
Project [key#374,value#375,value#370]
BroadcastHashJoin [key#374], [key#369], BuildLeft
HiveTableScan [key#374,value#375], (MetastoreRelation default, tab5, None)
Project [_1#367 AS key#369,_2#368 AS value#370]
Scan PhysicalRDD[_1#367,_2#368]
你为什么说不必要的排序?合并联接需要对数据进行排序。在恕我直言中,没有比合并联接更好的策略来执行完整的外部联接,除非您的数据帧之一小到可以广播