使用 Spark structured streaming 时,如何像 Spark Streaming 一样只获取当前批次的聚合结果?

When using Spark structured streaming , how to just get the aggregation result of current batch, like Spark Streaming?

Spark Structure Streaming (SSS) 和 Spark Streaming (SS) 之间的一大不同是 SSS 可以利用 statestore。它可以存储以前批次的聚合结果,并将当前结果与以前的结果一起应用。所以它可以从输入流的最开始得到真正的聚合结果。

但是对于一种情况,我们不想得到与 statestore 的先前值合并的最终结果。我们只想获取(输出)当前批次的聚合结果。由于平台和框架的原因,我们无法回滚到 SS。

所以我的问题是,在 SSS 中是否仍然可以像 SS 一样获取当前批次的聚合结果?

以spark structure streaming guide中给出的字数统计应用为例: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

当一批出现“cat cat”时,我的预期输出是cat|2

当下一批出现“cat”时,我的预期输出是cat|1

is it still doable in SSS to get the aggretation result of current batch, like SS?

一种实现您想要的方法是使用 mapGroupsWithState 自己控制状态存储,并将其用作一种实际上不执行任何操作的退化存储。例如:

val spark =
  SparkSession.builder().appName("bla").master("local[*]").getOrCreate()

import spark.implicits._

val socketDF = spark.readStream
  .format("socket")
  .option("host", "127.0.0.1")
  .option("port", 9999)
  .load()

socketDF
  .as[String]
  .map { str =>
    val Array(key, value) = str.split(';')
    (key, value)
  }
  .groupByKey { case (key, _) => key }
  .mapGroupsWithState((str: String,
                       tuples: Iterator[(String, String)],
                       value: GroupState[Int]) => {
    (str, tuples.size)
  })
  .writeStream
  .outputMode(OutputMode.Update())
  .format("console")
  .start()
  .awaitTermination()

假设我有一个 key;value 格式的值流,这将只使用 mapGroupsWithState 作为传递存储,实际上不会累积任何结果。这样,对于每个批次,您都会获得一个没有以前聚合数据的干净状态。

使用追加输出模式怎么样?

Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.

在 Spark 2.4 中,似乎有一种更简单的方法可以实现这一点,即使用

foreachBatch

操作,您可以在 Spark 文档中阅读。

但是我用的是2.3版本的Spark,一直没有解决这个问题