如何在 Spark Structured Streaming 中将流式 DataFrame 写入多个接收器
How to write streaming DataFrame into multiple sinks in Spark Structured Streaming
我有一组 SQL 规则,我需要将其应用于 foreachBatch()
内的流数据帧。应用这些规则后,resultant/filtered 数据帧应写入多个目的地,如“delta”和“cosmos DB”。
下面是我试过的:
使用来自 forEachBatch()
方法的静态数据框,我正在尝试创建一个临时视图,如下所示。
df.writeStream
.format("delta")
.foreachBatch(writeToDelta _)
.outputMode("update")
.start()
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
microBatchOutputDF.createOrReplaceTempView("testTable")
}
但是 运行 代码显示为 table 或未找到视图 'testTable'。
是否可以在 spark 结构化流中使用静态数据帧创建临时 table/view?
或者如何写入多个接收器?
来自澄清 OP 问题的评论:
"I have a set of SQL rules which I need to apply on the dataframe inside forEachBatch(). After applying the rules, the resultant/filtered dataframe will be written to multiple destinations like delta and cosmos DB."
foreachBatch 允许您
- 重复使用现有的批处理数据源
- 写入多个位置
在您的情况下,我知道您想对流数据帧应用不同的转换并将其写入多个位置。你可以像下面那样做:
df.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
// persist dataframe in case you are reusing it multiple times
batchDF.persist()
// apply SQL logic using `selectExpr` or just the DataFrame API
val deltaBatchDf = batchDF.selectExpr("")
val cosmosBatchDf = batchDF.selectExpr("")
// write to multiple sinks like you would do with batch DataFrames
// add more locations if required
deltaBatchDf.write.format("delta").options(...).save(...)
cosmosBatchDf.write.format("cosmos").options(...).save(...)
// free memory
batchDF.unpersist()
}
我有一组 SQL 规则,我需要将其应用于 foreachBatch()
内的流数据帧。应用这些规则后,resultant/filtered 数据帧应写入多个目的地,如“delta”和“cosmos DB”。
下面是我试过的:
使用来自 forEachBatch()
方法的静态数据框,我正在尝试创建一个临时视图,如下所示。
df.writeStream
.format("delta")
.foreachBatch(writeToDelta _)
.outputMode("update")
.start()
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
microBatchOutputDF.createOrReplaceTempView("testTable")
}
但是 运行 代码显示为 table 或未找到视图 'testTable'。
是否可以在 spark 结构化流中使用静态数据帧创建临时 table/view?
或者如何写入多个接收器?
来自澄清 OP 问题的评论:
"I have a set of SQL rules which I need to apply on the dataframe inside forEachBatch(). After applying the rules, the resultant/filtered dataframe will be written to multiple destinations like delta and cosmos DB."
foreachBatch 允许您
- 重复使用现有的批处理数据源
- 写入多个位置
在您的情况下,我知道您想对流数据帧应用不同的转换并将其写入多个位置。你可以像下面那样做:
df.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
// persist dataframe in case you are reusing it multiple times
batchDF.persist()
// apply SQL logic using `selectExpr` or just the DataFrame API
val deltaBatchDf = batchDF.selectExpr("")
val cosmosBatchDf = batchDF.selectExpr("")
// write to multiple sinks like you would do with batch DataFrames
// add more locations if required
deltaBatchDf.write.format("delta").options(...).save(...)
cosmosBatchDf.write.format("cosmos").options(...).save(...)
// free memory
batchDF.unpersist()
}