如何在 Databricks Deltalake 上将 spark 数据框与配置单元 table 合并?
How to merge a spark dataframe with hive table on Databricks Deltalake?
我有一个数据框如下:
val data = Seq(("James", "Sales", 34, "Developer"), ("Michael", "Sales", 56, "Architect"), ("Robert", "Sales", 30, "Manager"), ("Maria", "Finance", 24, "Consultant"))
val df1 = data.toDF("name","dept","id", "role")
df1.printSchema()
root
|-- name: string (nullable = true)
|-- dept: string (nullable = true)
|-- id: integer (nullable = true)
|-- role: string (nullable = true)
我有一个配置单元 table 具有相同的列和确切的架构:
val df2 = spark.sql("select * from db.table")
从传入的数据帧df1
我得到了两条新记录和两条更新记录。
val df2 = spark.sql("select * from db.table where name in ('James', 'Michael')")
df2.show()
+-------+-------+---+----------+
| name| dept| id| role|
+-------+-------+---+----------+
| James| Sales| 34| Associate|
|Michael| Sales| 56| Junior|
+-------+-------+---+----------+
这里使用的键是:dept
& id
在我之前的一个项目中,我们曾经在暂存 table 中将传入的数据帧与 Hive table 的分区连接起来,并且只是 运行 exchange partition
为了将现有的配置单元分区与包含合并数据的分段 table 交换。
我们正在使用 Spark 的 Databricks 分布。
我们的配置单元 table 建立在 Databricks delta lake 上并且有数百万行。
有没有其他方法可以将我的传入数据帧 df1 与我的配置单元 table 合并?
如果是这样,我怎样才能在不影响性能的情况下实现它。
正如 Tim 所提到的,如果您的目的地 table 已经在 Delta 上,那么您只需要使用 MERGE INTO
SQL 命令,或相应的 Scala API(参见 docs on Delta Merge)。你需要这样的东西:
import io.delta.tables._
import org.apache.spark.sql.functions._
DeltaTable.forName(spark, "db.table")
.as("target")
.merge(
df1.as("updates"),
"target.dept = updates.dept and target.id = updates.id")
.whenMatched
.updateAll()
.whenNotMatched
.insertAll()
.execute()
不匹配的数据将原样插入,匹配的数据将放入包含原始记录的重写文件中。通常重写是主要的性能损失,您可能需要减小文件大小以重写更少的数据(参见 docs) - in newer versions it's possible to configure tables such way so Databricks Spark engine will automatically find optimal file size to decrease the rewrite time without affecting the read patterns (see docs)
我有一个数据框如下:
val data = Seq(("James", "Sales", 34, "Developer"), ("Michael", "Sales", 56, "Architect"), ("Robert", "Sales", 30, "Manager"), ("Maria", "Finance", 24, "Consultant"))
val df1 = data.toDF("name","dept","id", "role")
df1.printSchema()
root
|-- name: string (nullable = true)
|-- dept: string (nullable = true)
|-- id: integer (nullable = true)
|-- role: string (nullable = true)
我有一个配置单元 table 具有相同的列和确切的架构:
val df2 = spark.sql("select * from db.table")
从传入的数据帧df1
我得到了两条新记录和两条更新记录。
val df2 = spark.sql("select * from db.table where name in ('James', 'Michael')")
df2.show()
+-------+-------+---+----------+
| name| dept| id| role|
+-------+-------+---+----------+
| James| Sales| 34| Associate|
|Michael| Sales| 56| Junior|
+-------+-------+---+----------+
这里使用的键是:dept
& id
在我之前的一个项目中,我们曾经在暂存 table 中将传入的数据帧与 Hive table 的分区连接起来,并且只是 运行 exchange partition
为了将现有的配置单元分区与包含合并数据的分段 table 交换。
我们正在使用 Spark 的 Databricks 分布。 我们的配置单元 table 建立在 Databricks delta lake 上并且有数百万行。 有没有其他方法可以将我的传入数据帧 df1 与我的配置单元 table 合并? 如果是这样,我怎样才能在不影响性能的情况下实现它。
正如 Tim 所提到的,如果您的目的地 table 已经在 Delta 上,那么您只需要使用 MERGE INTO
SQL 命令,或相应的 Scala API(参见 docs on Delta Merge)。你需要这样的东西:
import io.delta.tables._
import org.apache.spark.sql.functions._
DeltaTable.forName(spark, "db.table")
.as("target")
.merge(
df1.as("updates"),
"target.dept = updates.dept and target.id = updates.id")
.whenMatched
.updateAll()
.whenNotMatched
.insertAll()
.execute()
不匹配的数据将原样插入,匹配的数据将放入包含原始记录的重写文件中。通常重写是主要的性能损失,您可能需要减小文件大小以重写更少的数据(参见 docs) - in newer versions it's possible to configure tables such way so Databricks Spark engine will automatically find optimal file size to decrease the rewrite time without affecting the read patterns (see docs)