Spark Structured Streaming 中同一 Dataframe/Dataset 上的多个 operations/aggregations
Multiple operations/aggregations on the same Dataframe/Dataset in Spark Structured Streaming
我使用的是 Spark 2.3.2。
我正在从 Kafka 接收数据。我必须对同一数据进行多次聚合。然后所有聚合结果将转到 相同的数据库(列或表可能会更改)。例如:
val kafkaSource = spark.readStream.option("kafka") ...
val agg1 = kafkaSource.groupBy().agg ...
val agg2 = kafkaSource.groupBy().mapgroupswithstate() ...
val agg3 = kafkaSource.groupBy().mapgroupswithstate() ...
但是当我尝试为每个聚合结果调用 writeStream 时:
aggr1.writeStream().foreach().start()
aggr2.writeStream().foreach().start()
aggr3.writeStream().foreach().start()
Spark在每个writeStream中独立接收数据。这种方式有效率吗?
我可以用一个writeStream做多个聚合吗?如果可以,这种方式效率高吗?
可以使用缓存来避免多次读取:
kafkaSource.writeStream.foreachBatch((df, id) => {
df.persist()
val agg1 = df.groupBy().agg ...
val agg2 = df.groupBy().mapgroupswithstate() ...
val agg3 = df.groupBy().mapgroupswithstate() ...
df.unpersist()
}).start()
每个“writestream”操作都会产生一个新的流式查询。每个流式查询将从源读取并执行整个查询计划。与 DStream 不同,没有 cache/persist 选项可用。
在 spark 2.4 中,引入了一个新的API“forEachBatch”来更有效地解决此类场景。
我使用的是 Spark 2.3.2。
我正在从 Kafka 接收数据。我必须对同一数据进行多次聚合。然后所有聚合结果将转到 相同的数据库(列或表可能会更改)。例如:
val kafkaSource = spark.readStream.option("kafka") ...
val agg1 = kafkaSource.groupBy().agg ...
val agg2 = kafkaSource.groupBy().mapgroupswithstate() ...
val agg3 = kafkaSource.groupBy().mapgroupswithstate() ...
但是当我尝试为每个聚合结果调用 writeStream 时:
aggr1.writeStream().foreach().start()
aggr2.writeStream().foreach().start()
aggr3.writeStream().foreach().start()
Spark在每个writeStream中独立接收数据。这种方式有效率吗?
我可以用一个writeStream做多个聚合吗?如果可以,这种方式效率高吗?
可以使用缓存来避免多次读取:
kafkaSource.writeStream.foreachBatch((df, id) => {
df.persist()
val agg1 = df.groupBy().agg ...
val agg2 = df.groupBy().mapgroupswithstate() ...
val agg3 = df.groupBy().mapgroupswithstate() ...
df.unpersist()
}).start()
每个“writestream”操作都会产生一个新的流式查询。每个流式查询将从源读取并执行整个查询计划。与 DStream 不同,没有 cache/persist 选项可用。
在 spark 2.4 中,引入了一个新的API“forEachBatch”来更有效地解决此类场景。