如何在 Spark Structured Streaming 中将流式 DataFrame 写入多个接收器

How to write streaming DataFrame into multiple sinks in Spark Structured Streaming

我有一组 SQL 规则,我需要将其应用于 foreachBatch() 内的流数据帧。应用这些规则后,resultant/filtered 数据帧应写入多个目的地,如“delta”和“cosmos DB”。

下面是我试过的: 使用来自 forEachBatch() 方法的静态数据框,我正在尝试创建一个临时视图,如下所示。

df.writeStream
  .format("delta")
  .foreachBatch(writeToDelta _)
  .outputMode("update")
  .start()

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
    microBatchOutputDF.createOrReplaceTempView("testTable")
}

但是 运行 代码显示为 table 或未找到视图 'testTable'。

是否可以在 spark 结构化流中使用静态数据帧创建临时 table/view?

或者如何写入多个接收器?

来自澄清 OP 问题的评论:

"I have a set of SQL rules which I need to apply on the dataframe inside forEachBatch(). After applying the rules, the resultant/filtered dataframe will be written to multiple destinations like delta and cosmos DB."

foreachBatch 允许您

  • 重复使用现有的批处理数据源
  • 写入多个位置

在您的情况下,我知道您想对流数据帧应用不同的转换并将其写入多个位置。你可以像下面那样做:

df.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>

  // persist dataframe in case you are reusing it multiple times
  batchDF.persist()

  // apply SQL logic using `selectExpr` or just the DataFrame API
  val deltaBatchDf = batchDF.selectExpr("") 
  val cosmosBatchDf = batchDF.selectExpr("") 

  // write to multiple sinks like you would do with batch DataFrames
  // add more locations if required
  deltaBatchDf.write.format("delta").options(...).save(...)
  cosmosBatchDf.write.format("cosmos").options(...).save(...)

  // free memory
  batchDF.unpersist()
}