火花流 |将不同的数据帧写入 Synapse DW 中的多个表
Spark Streaming | Write different data frames to multiple tables in Synapse DW
我有多个数据帧,它们是从来自 azure-event-hub 的一条 json 消息中提取的。我们想使用 spark 流作业将这些 DF 推送到 Synapse DW 中的单独表。
这是我的架构 -
root
|-- Name: string (nullable = true)
|-- Salary: string (nullable = true)
|-- EmpID: string (nullable = true)
|-- Projects: struct (nullable = true)
| |-- ProjectID: string (nullable = true)
| |-- ProjectName: string (nullable = true)
| |-- Duration: string (nullable = true)
| |-- Location: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- City: string (nullable = true)
| | | |-- State: string (nullable = true)
| |-- Contact: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- Phone: string (nullable = true)
| | | |-- email: string (nullable = true)
我从上面的模式中提取了 4 个不同的数据框 -
- 项目
- 位置
- 联系方式
- 员工
它们都应该插入到 Synapse 中的 4 个不同的表中
ProjectDf.write.format("spark.sqldw").options(.dbo.Project).save(...)
LocationDf.write.format("spark.sqldw").options(.dbo.Loc).save(...)
ContactDf.write.format("spark.sqldw").options(.dbo.Contact).save(...)
EmployeeDf.write.format("spark.sqldw").options(.dbo.Emp).save(...)
请建议如何在此应用 ForeachBatch 接收器来插入表。
如果您计划基于单个输入流数据帧编写四个不同的数据帧,您可以通过以下方式使用foreachBatch
:
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
// as you plan to use the batchDF to create multiple output it might be wort persisting the batchDF
batchDF.persist()
// create the four different Dataframes based on the input
val ProjectDf = batchDF.select(...)
val LocationDf = batchDF.select(...)
val ContactDf = batchDF.select(...)
val EmployeeDf = batchDF.select(...)
// then you can save those four Dataframes into the desired locations
ProjectDf.write.format("spark.sqldw").options(.dbo.Project).save(...)
LocationDf.write.format("spark.sqldw").options(.dbo.Loc).save(...)
ContactDf.write.format("spark.sqldw").options(.dbo.Contact).save(...)
EmployeeDf.write.format("spark.sqldw").options(.dbo.Emp).save(...)
// do not forget to unpersist your batchDF
batchDF.unpersist()
}
Using foreach and foreachBatch
的文档中对此进行了描述
如果您遇到异常“使用替代方法重载方法 foreachBatch”,您可以查看 Databricks Runtime 7.0 的发行说明,其中显示:
"To fix the compilation error, change foreachBatch { (df, id) => myFunc(df, id) }
to foreachBatch(myFunc _)
or use the Java API explicitly: foreachBatch(new VoidFunction2 ...)."
也就是说,您的代码如下所示:
def myFunc(batchDF: DataFrame, batchId: Long): Unit = {
// as you plan to use the batchDF to create multiple output it might be wort persisting the batchDF
batchDF.persist()
// create the four different Dataframes based on the input
val ProjectDf = batchDF.select(...)
val LocationDf = batchDF.select(...)
val ContactDf = batchDF.select(...)
val EmployeeDf = batchDF.select(...)
// then you can save those four Dataframes into the desired locations
ProjectDf.write.format("spark.sqldw").options(.dbo.Project).save(...)
LocationDf.write.format("spark.sqldw").options(.dbo.Loc).save(...)
ContactDf.write.format("spark.sqldw").options(.dbo.Contact).save(...)
EmployeeDf.write.format("spark.sqldw").options(.dbo.Emp).save(...)
// do not forget to unpersist your batchDF
batchDF.unpersist()
}
streamingDF.writeStream.foreachBatch(myFunc _).[...].start()
我有多个数据帧,它们是从来自 azure-event-hub 的一条 json 消息中提取的。我们想使用 spark 流作业将这些 DF 推送到 Synapse DW 中的单独表。
这是我的架构 -
root
|-- Name: string (nullable = true)
|-- Salary: string (nullable = true)
|-- EmpID: string (nullable = true)
|-- Projects: struct (nullable = true)
| |-- ProjectID: string (nullable = true)
| |-- ProjectName: string (nullable = true)
| |-- Duration: string (nullable = true)
| |-- Location: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- City: string (nullable = true)
| | | |-- State: string (nullable = true)
| |-- Contact: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- Phone: string (nullable = true)
| | | |-- email: string (nullable = true)
我从上面的模式中提取了 4 个不同的数据框 -
- 项目
- 位置
- 联系方式
- 员工
它们都应该插入到 Synapse 中的 4 个不同的表中
ProjectDf.write.format("spark.sqldw").options(.dbo.Project).save(...)
LocationDf.write.format("spark.sqldw").options(.dbo.Loc).save(...)
ContactDf.write.format("spark.sqldw").options(.dbo.Contact).save(...)
EmployeeDf.write.format("spark.sqldw").options(.dbo.Emp).save(...)
请建议如何在此应用 ForeachBatch 接收器来插入表。
如果您计划基于单个输入流数据帧编写四个不同的数据帧,您可以通过以下方式使用foreachBatch
:
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
// as you plan to use the batchDF to create multiple output it might be wort persisting the batchDF
batchDF.persist()
// create the four different Dataframes based on the input
val ProjectDf = batchDF.select(...)
val LocationDf = batchDF.select(...)
val ContactDf = batchDF.select(...)
val EmployeeDf = batchDF.select(...)
// then you can save those four Dataframes into the desired locations
ProjectDf.write.format("spark.sqldw").options(.dbo.Project).save(...)
LocationDf.write.format("spark.sqldw").options(.dbo.Loc).save(...)
ContactDf.write.format("spark.sqldw").options(.dbo.Contact).save(...)
EmployeeDf.write.format("spark.sqldw").options(.dbo.Emp).save(...)
// do not forget to unpersist your batchDF
batchDF.unpersist()
}
Using foreach and foreachBatch
的文档中对此进行了描述如果您遇到异常“使用替代方法重载方法 foreachBatch”,您可以查看 Databricks Runtime 7.0 的发行说明,其中显示:
"To fix the compilation error, change
foreachBatch { (df, id) => myFunc(df, id) }
toforeachBatch(myFunc _)
or use the Java API explicitly: foreachBatch(new VoidFunction2 ...)."
也就是说,您的代码如下所示:
def myFunc(batchDF: DataFrame, batchId: Long): Unit = {
// as you plan to use the batchDF to create multiple output it might be wort persisting the batchDF
batchDF.persist()
// create the four different Dataframes based on the input
val ProjectDf = batchDF.select(...)
val LocationDf = batchDF.select(...)
val ContactDf = batchDF.select(...)
val EmployeeDf = batchDF.select(...)
// then you can save those four Dataframes into the desired locations
ProjectDf.write.format("spark.sqldw").options(.dbo.Project).save(...)
LocationDf.write.format("spark.sqldw").options(.dbo.Loc).save(...)
ContactDf.write.format("spark.sqldw").options(.dbo.Contact).save(...)
EmployeeDf.write.format("spark.sqldw").options(.dbo.Emp).save(...)
// do not forget to unpersist your batchDF
batchDF.unpersist()
}
streamingDF.writeStream.foreachBatch(myFunc _).[...].start()