在spark scala中对多个数据帧进行左外连接
Doing left outer join on multiple data frames in spark scala
我是 Spark 的新手。我试图使用 scala 实现以下用例。
-DataFrame 1
| col A | col B |
-----------------
| 1 | a |
| 2 | a |
| 3 | a |
-DataFrame 2
| col A | col B |
-----------------
| 1 | b |
| 3 | b |
-DataFrame 3
| col A | col B |
-----------------
| 2 | c |
| 3 | c |
最终输出帧应该是
| col A | col B |
-----------------
| 1 | a,b |
| 2 | a,c |
| 3 | a,b,c |
帧数不限于 3 ,它可以是小于 100.So 我正在打印每个数据帧的每个帧的任何数字。
有人可以帮助我如何创建最终数据框,在其中我可以输出上述格式的 N 个数据框。
感谢您的帮助。
您可以使用foldLeft
通过外连接迭代合并数据
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
val df1 = Seq((1, "a"), (2, "a"), (3, "a")).toDF("Col A", "Col B")
val df2 = Seq((1, "b"), (2, "b")).toDF("Col A", "Col B")
val df3 = Seq((2, "c"), (3, "c")).toDF("Col A", "Col B")
val dfs = Seq(df2, df3)
val bs = (0 to dfs.size).map(i => s"Col B $i")
dfs.foldLeft(df1)(
(acc, df) => acc.join(df, Seq("Col A"), "fullouter")
).toDF("Col A" +: bs: _*).select($"Col A", array(bs map col: _*)).map {
case Row(a: Int, bs: Seq[_]) =>
// Drop nulls and concat
(a, bs.filter(_ != null).map(_.toString).mkString(","))
}.toDF("Col A", "Col B").show
// +-----+-----+
// |Col A|Col B|
// +-----+-----+
// | 1| a,b|
// | 3| a,c|
// | 2|a,b,c|
// +-----+-----+
但如果你真的认为
it can be any number less than 100
那就太不现实了。 join
是 Spark 中最昂贵的操作,即使进行了所有优化器改进,它也无法正常工作。
今天看到这个问题。我建议你使用 python 来解决它。它比 scala 更容易编写。他们是:
from pyspark.sql import SQLContext
from pyspark.sql.functions import concat_ws
d1=sc.parallelize([(1, "a"), (2, "a"), (3,"a")]).toDF().toDF("Col_A","Col_B")
d2=sc.parallelize([(1, "b"), (2, "b")]).toDF().toDF("Col_A", "Col_B")
d3=sc.parallelize([(2, "c"), (3, "c")]).toDF().toDF("Col_A", "Col_B")
d4=d1.join(d2,'Col_A','left').join(d3,'Col_A','left').select(d1.Col_A.alias("col A"),concat_ws(',',d1.Col_B,d2.Col_B,d3.Col_B).alias("col B"))
df4.show()
+-----+-----+
|col A|col B|
+-----+-----+
| 1
| a,b|
| 2
|a,b,c|
| 3
| a,c|
+-----+-----+
结果你看!
我是 Spark 的新手。我试图使用 scala 实现以下用例。
-DataFrame 1
| col A | col B |
-----------------
| 1 | a |
| 2 | a |
| 3 | a |
-DataFrame 2
| col A | col B |
-----------------
| 1 | b |
| 3 | b |
-DataFrame 3
| col A | col B |
-----------------
| 2 | c |
| 3 | c |
最终输出帧应该是
| col A | col B |
-----------------
| 1 | a,b |
| 2 | a,c |
| 3 | a,b,c |
帧数不限于 3 ,它可以是小于 100.So 我正在打印每个数据帧的每个帧的任何数字。
有人可以帮助我如何创建最终数据框,在其中我可以输出上述格式的 N 个数据框。
感谢您的帮助。
您可以使用foldLeft
通过外连接迭代合并数据
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
val df1 = Seq((1, "a"), (2, "a"), (3, "a")).toDF("Col A", "Col B")
val df2 = Seq((1, "b"), (2, "b")).toDF("Col A", "Col B")
val df3 = Seq((2, "c"), (3, "c")).toDF("Col A", "Col B")
val dfs = Seq(df2, df3)
val bs = (0 to dfs.size).map(i => s"Col B $i")
dfs.foldLeft(df1)(
(acc, df) => acc.join(df, Seq("Col A"), "fullouter")
).toDF("Col A" +: bs: _*).select($"Col A", array(bs map col: _*)).map {
case Row(a: Int, bs: Seq[_]) =>
// Drop nulls and concat
(a, bs.filter(_ != null).map(_.toString).mkString(","))
}.toDF("Col A", "Col B").show
// +-----+-----+
// |Col A|Col B|
// +-----+-----+
// | 1| a,b|
// | 3| a,c|
// | 2|a,b,c|
// +-----+-----+
但如果你真的认为
it can be any number less than 100
那就太不现实了。 join
是 Spark 中最昂贵的操作,即使进行了所有优化器改进,它也无法正常工作。
今天看到这个问题。我建议你使用 python 来解决它。它比 scala 更容易编写。他们是:
from pyspark.sql import SQLContext
from pyspark.sql.functions import concat_ws
d1=sc.parallelize([(1, "a"), (2, "a"), (3,"a")]).toDF().toDF("Col_A","Col_B")
d2=sc.parallelize([(1, "b"), (2, "b")]).toDF().toDF("Col_A", "Col_B")
d3=sc.parallelize([(2, "c"), (3, "c")]).toDF().toDF("Col_A", "Col_B")
d4=d1.join(d2,'Col_A','left').join(d3,'Col_A','left').select(d1.Col_A.alias("col A"),concat_ws(',',d1.Col_B,d2.Col_B,d3.Col_B).alias("col B"))
df4.show()
+-----+-----+
|col A|col B|
+-----+-----+
| 1
| a,b|
| 2
|a,b,c|
| 3
| a,c|
+-----+-----+
结果你看!