spark 连接数据帧并合并模式
spark concatenate data frames and merge schema
我在 spark 中有几个数据框,开头有部分相似的架构 (header),结尾有不同的列(自定义)。
case class First(header1:String, header2:String, header3:Int, custom1:String)
case class Second(header1:String, header2:String, header3:Int, custom1:String, custom5:String)
case class Third(header1:String, header2:String, header3:Int, custom2:String, custom3:Int, custom4:Double)
val first = Seq(First("A", "Ba1", 1, "custom1"), First("A", "Ba2", 2, "custom2")).toDS
val second = Seq(Second("B", "Bb1", 1, "custom12", "custom5"), Second("B", "Bb2", 22, "custom12", "custom55")).toDS
val third = Seq(Third("A", "Bc1", 1, "custom2", 22, 44.4)).toDS
这可能看起来像:
+-------+-------+-------+-------+
|header1|header2|header3|custom1|
+-------+-------+-------+-------+
| A| Ba1| 1|custom1|
| A| Ba2| 2|custom2|
+-------+-------+-------+-------+
+-------+-------+-------+--------+--------+
|header1|header2|header3| custom1| custom5|
+-------+-------+-------+--------+--------+
| B| Bb1| 1|custom12| custom5|
| B| Bb2| 22|custom12|custom55|
+-------+-------+-------+--------+--------+
+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom2|custom3|custom4|
+-------+-------+-------+-------+-------+-------+
| A| Bc1| 1|custom2| 22| 44.4|
+-------+-------+-------+-------+-------+-------+
如何合并模式以基本上将所有数据帧连接成一个模式
case class All(header1:String, header2:String, header3:Int, custom1:Option[String], custom3:Option[String],
custom4: Option[Double], custom5:Option[String], type:String)
哪些不存在的列可以为空?
如果数据框中的第一条记录名为 first
,则输出应如下所示
+-------+-------+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom1|custom2|custom3|custom4|custom5|
+-------+-------+-------+-------+-------+-------+-------+-------+
| A| B| 1|custom1|Nan |Nan | Nan| Nan. |
+-------+-------+-------+-------+-------+-------+-------+-------+
我正在考虑通过 header 列加入数据框,但是,只有一些(比如说 header1)会持有相同的(实际上可加入的)值,而其他的(header2,3) 会持有不同的值,即
first
.join(second, Seq("header1", "header2", "header3"), "LEFT")
.join(third, Seq("header1", "header2", "header3"), "LEFT")
.show
导致
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom1|custom1|custom5|custom2|custom3|custom4|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| A| Ba1| 1|custom1| null| null| null| null| null|
| A| Ba2| 2|custom2| null| null| null| null| null|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
不正确,因为我只想 pd.Concat(axis=0)
数据框,即缺少大部分记录。
此外,它还缺少一个 type
列来标识原始数据框,即 first, second, third
编辑
我认为经典的全外连接是解决方案
first
.join(second, Seq("header1", "header2", "header3"), "fullouter")
.join(third, Seq("header1", "header2", "header3"), "fullouter")
.show
产量:
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
|header1|header2|header3|custom1| custom1| custom5|custom2|custom3|custom4|
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
| A| Ba1| 1|custom1| null| null| null| null| null|
| A| Ba2| 2|custom2| null| null| null| null| null|
| A| Bb1| 1| null|custom12| custom5| null| null| null|
| A| Bb2| 22| null|custom12|custom55| null| null| null|
| A| Bc1| 1| null| null| null|custom2| 22| 44.4|
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
如您所见,实际上永远不会有真正的联接,行是串联的。有没有更简单的操作来实现同样的功能?
这个答案不是最优的,因为 custom1
是一个重复的名字。我宁愿看到一个 custom1
列(如果有第二个要填充,则没有空值)。
请测试 SQL 联合方法是否提供所需的结果。
SELECT header1,
header2,
header3,
custom1,
To_char(NULL) "custom2",
To_char(NULL) "custom3",
To_number(NULL) "custom4",
To_char(NULL) "custom5"
FROM table1
UNION
SELECT header1,
header2,
header3,
custom1,
To_char(NULL) "custom2",
To_char(NULL) "custom3",
To_number(NULL) "custom4",
custom5
FROM table2
UNION
SELECT header1,
header2,
header3,
To_char(NULL) "custom1",
custom2,
custom3,
custom4,
To_char(NULL) "custom5"
FROM table3;
看看我的 。基本上你需要联合所有的框架。要制作类似的架构,您需要使用 dataframe.withColumn(ColumnName, expr("null"))
表达式:
import org.apache.spark.sql.functions._
val first1 = first.withColumn("custom5", expr("null"))
.withColumn("custom4", expr("null"))
val second2 = second.withColumn("custom4", expr("null"))
val result = first1.unionAll(second2).unionAll(third)
如果您正在将文件写入 HDFS,那么您可以通过将以下 属性 Spark.sql.parquet.mergeSchema
设置为 TRUE 并将文件写入 HDFS 位置来实现。
它会自动更新架构和 returns 所有列。
您可以通过以下方式实现此目的
- withColumn 和 union
- 在自身之前指定架构并执行联合
spark.conf.set("spark.sql.parquet.mergeSchema","true")
eb = spark.read.format("csv").schema(schem).option("path","/retail/ebay.csv").load()
eb.printSchema()
eb.write.format("parquet").mode("append").save("/retail/parquet_test")
from pyspark.sql.functions import lit
eb1 = eb.withColumn("dummy",lit(35))
eb1.printSchema()
eb1.write.format("parquet").mode("append").save("/retail/parquet_test")
eb2 = spark.read.parquet("/srinchin/parquet_test")
eb2.printSchema()
我在 spark 中有几个数据框,开头有部分相似的架构 (header),结尾有不同的列(自定义)。
case class First(header1:String, header2:String, header3:Int, custom1:String)
case class Second(header1:String, header2:String, header3:Int, custom1:String, custom5:String)
case class Third(header1:String, header2:String, header3:Int, custom2:String, custom3:Int, custom4:Double)
val first = Seq(First("A", "Ba1", 1, "custom1"), First("A", "Ba2", 2, "custom2")).toDS
val second = Seq(Second("B", "Bb1", 1, "custom12", "custom5"), Second("B", "Bb2", 22, "custom12", "custom55")).toDS
val third = Seq(Third("A", "Bc1", 1, "custom2", 22, 44.4)).toDS
这可能看起来像:
+-------+-------+-------+-------+
|header1|header2|header3|custom1|
+-------+-------+-------+-------+
| A| Ba1| 1|custom1|
| A| Ba2| 2|custom2|
+-------+-------+-------+-------+
+-------+-------+-------+--------+--------+
|header1|header2|header3| custom1| custom5|
+-------+-------+-------+--------+--------+
| B| Bb1| 1|custom12| custom5|
| B| Bb2| 22|custom12|custom55|
+-------+-------+-------+--------+--------+
+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom2|custom3|custom4|
+-------+-------+-------+-------+-------+-------+
| A| Bc1| 1|custom2| 22| 44.4|
+-------+-------+-------+-------+-------+-------+
如何合并模式以基本上将所有数据帧连接成一个模式
case class All(header1:String, header2:String, header3:Int, custom1:Option[String], custom3:Option[String],
custom4: Option[Double], custom5:Option[String], type:String)
哪些不存在的列可以为空?
如果数据框中的第一条记录名为 first
,则输出应如下所示+-------+-------+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom1|custom2|custom3|custom4|custom5|
+-------+-------+-------+-------+-------+-------+-------+-------+
| A| B| 1|custom1|Nan |Nan | Nan| Nan. |
+-------+-------+-------+-------+-------+-------+-------+-------+
我正在考虑通过 header 列加入数据框,但是,只有一些(比如说 header1)会持有相同的(实际上可加入的)值,而其他的(header2,3) 会持有不同的值,即
first
.join(second, Seq("header1", "header2", "header3"), "LEFT")
.join(third, Seq("header1", "header2", "header3"), "LEFT")
.show
导致
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom1|custom1|custom5|custom2|custom3|custom4|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| A| Ba1| 1|custom1| null| null| null| null| null|
| A| Ba2| 2|custom2| null| null| null| null| null|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
不正确,因为我只想 pd.Concat(axis=0)
数据框,即缺少大部分记录。
此外,它还缺少一个 type
列来标识原始数据框,即 first, second, third
编辑
我认为经典的全外连接是解决方案
first
.join(second, Seq("header1", "header2", "header3"), "fullouter")
.join(third, Seq("header1", "header2", "header3"), "fullouter")
.show
产量:
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
|header1|header2|header3|custom1| custom1| custom5|custom2|custom3|custom4|
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
| A| Ba1| 1|custom1| null| null| null| null| null|
| A| Ba2| 2|custom2| null| null| null| null| null|
| A| Bb1| 1| null|custom12| custom5| null| null| null|
| A| Bb2| 22| null|custom12|custom55| null| null| null|
| A| Bc1| 1| null| null| null|custom2| 22| 44.4|
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
如您所见,实际上永远不会有真正的联接,行是串联的。有没有更简单的操作来实现同样的功能?
这个答案不是最优的,因为 custom1
是一个重复的名字。我宁愿看到一个 custom1
列(如果有第二个要填充,则没有空值)。
请测试 SQL 联合方法是否提供所需的结果。
SELECT header1,
header2,
header3,
custom1,
To_char(NULL) "custom2",
To_char(NULL) "custom3",
To_number(NULL) "custom4",
To_char(NULL) "custom5"
FROM table1
UNION
SELECT header1,
header2,
header3,
custom1,
To_char(NULL) "custom2",
To_char(NULL) "custom3",
To_number(NULL) "custom4",
custom5
FROM table2
UNION
SELECT header1,
header2,
header3,
To_char(NULL) "custom1",
custom2,
custom3,
custom4,
To_char(NULL) "custom5"
FROM table3;
看看我的 dataframe.withColumn(ColumnName, expr("null"))
表达式:
import org.apache.spark.sql.functions._
val first1 = first.withColumn("custom5", expr("null"))
.withColumn("custom4", expr("null"))
val second2 = second.withColumn("custom4", expr("null"))
val result = first1.unionAll(second2).unionAll(third)
如果您正在将文件写入 HDFS,那么您可以通过将以下 属性 Spark.sql.parquet.mergeSchema
设置为 TRUE 并将文件写入 HDFS 位置来实现。
它会自动更新架构和 returns 所有列。
您可以通过以下方式实现此目的
- withColumn 和 union
- 在自身之前指定架构并执行联合
spark.conf.set("spark.sql.parquet.mergeSchema","true")
eb = spark.read.format("csv").schema(schem).option("path","/retail/ebay.csv").load()
eb.printSchema()
eb.write.format("parquet").mode("append").save("/retail/parquet_test")
from pyspark.sql.functions import lit
eb1 = eb.withColumn("dummy",lit(35))
eb1.printSchema()
eb1.write.format("parquet").mode("append").save("/retail/parquet_test")
eb2 = spark.read.parquet("/srinchin/parquet_test")
eb2.printSchema()