flatMapGroupsWithState 中 OutputMode 的作用是什么? How/where用过吗?

What's the purpose of OutputMode in flatMapGroupsWithState? How/where is it used?

我正在探索 KeyValueGroupedDataset.flatMapGroupsWithState Spark Structured Streaming 中的任意状态聚合。

KeyValueGroupedDataset.flatMapGroupsWithState运算符的签名如下:

flatMapGroupsWithState[S: Encoder, U: Encoder](
  outputMode: OutputMode,
  timeoutConf: GroupStateTimeout)(
  func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U]

OutputMode 参数的目的是什么?

在查看(作为底层物理运算符的 FlatMapGroupsWithStateExec 的源代码时,我找不到任何可以使用 OutputMode 的地方。

确实,我也没有发现任何用途。我对此有几个理论:

  1. 这里的模式与org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsWithState逻辑运算符的签名保持一致。如果您检查 org.apache.spark.sql.execution.SparkStrategies.BasicOperators apply 方法,您会注意到逻辑运算符经常将其所有参数传递给物理运算符。我不确定,但它看起来像一个设计指南,但这只是我的假设。

  2. 也可能是遗留原因。 FlatMapGroupsWithStateMapGroupsWithState 演变而来,以强制执行输出模式语义。它是在这个 PR 中实现的: https://github.com/apache/spark/pull/17197/files (SPARK-19858) 将 MapGroupsWithState 重命名为 FlatMapGroupsWithState 并添加 outputMode 作为参数。也许 - 如果我之前的理论是错误的 - 它只是在这里因为它通过了 PR 并且没有人想抱怨它因为 "it was already here" 原则?

  3. 也许以后会传一个outputMode给映射函数?我发现用于保存流式聚合的节点 (StateStoreSaveExec) 使用输出模式来确定要持久保存在状态存储中的条目。也许这将是为 *withState 转换添加的新功能,如评论 btw 所述:

    • @param outputMode the output mode of func