Spark Structured Streaming 中同一 Dataframe/Dataset 上的多个 operations/aggregations

Multiple operations/aggregations on the same Dataframe/Dataset in Spark Structured Streaming

我使用的是 Spark 2.3.2。

我正在从 Kafka 接收数据。我必须对同一数据进行多次聚合。然后所有聚合结果将转到 相同的数据库(列或表可能会更改)。例如:

val kafkaSource = spark.readStream.option("kafka") ...
val agg1 = kafkaSource.groupBy().agg ...
val agg2 = kafkaSource.groupBy().mapgroupswithstate() ...
val agg3 = kafkaSource.groupBy().mapgroupswithstate() ...

但是当我尝试为每个聚合结果调用 writeStream 时:

aggr1.writeStream().foreach().start()
aggr2.writeStream().foreach().start()
aggr3.writeStream().foreach().start()

Spark在每个writeStream中独立接收数据。这种方式有效率吗?

我可以用一个writeStream做多个聚合吗?如果可以,这种方式效率高吗?

可以使用缓存来避免多次读取:

kafkaSource.writeStream.foreachBatch((df, id) => {
  df.persist()
  val agg1 = df.groupBy().agg ...
  val agg2 = df.groupBy().mapgroupswithstate() ...
  val agg3 = df.groupBy().mapgroupswithstate() ...
  df.unpersist()
}).start()

每个“writestream”操作都会产生一个新的流式查询。每个流式查询将从源读取并执行整个查询计划。与 DStream 不同,没有 cache/persist 选项可用。

在 spark 2.4 中,引入了一个新的API“forEachBatch”来更有效地解决此类场景。