附加模式下的 Spark 结构化流每次输出多行 window

Spark structured streaming in append mode outputting many rows per single time window

我正在使用 Apache Spark 编写持续应用程序。在结构化流式传输案例中,我试图从 Delta table 读取,通过时间 window 对事件时间执行流式聚合,并以追加模式将结果写入 Delta table .根据文档,我的期望是在追加模式下,只有一段时间 window 的最终聚合将被写入接收器。这不是我的经验。相反,我在我的目标 Delta table 中看到如下记录,独立于我尝试过的流配置(windowDuration=5 分钟,slideDuration=20 秒)。

Example output from stream

从上图可以看出,同一时间window正在向sink贡献多条记录。我确认每个微批最多输出一个时间 window 的一条记录,但是一个时间 window 可以贡献许多(数量上不明显一致)微批的输出记录。下面是流式聚合的核心代码。

output_schema = create_trades_data_features_schema()
features_sdf = (trades_sdf.withWatermark("event_datetime", f"{trades_stream_watermark_secs} seconds")
                          .withColumn('time_window', f.window(timeColumn=f.col('event_datetime'),
                                                              windowDuration=f"{analysis_window_length_secs} seconds",
                                                              slideDuration=f"{analysis_window_hop_size_secs} seconds"))
                          .groupBy('time_window')
                          .applyInPandas(lambda pdf: generate_trades_data_features(pdf, output_schema, data_type_cast), output_schema))

Pandas UDF 创建了一些保存标量值的变量,构建了一个形状为 [1,N] 的 Pandas DataFrame,并将其作为结果输出。也就是说,它返回一行。我唯一要分组的是时间 window。我怎么能同时获得多条记录 window?我已经以多种方式创建和关闭流,并且每次都收到相同的结果(例如,根据 Delta Lake docs, per the structured streaming guide,以及 read/load/table/toTable API 选项,尝试我能找到的每个选项配置... 是的,许多小时的蛮力)。我还尝试了水印持续时间和触发周期的各种值范围; none 产生了影响。

这是附加模式下的预期行为吗(即同时有多个记录window)?

编辑:我使用的是 Databricks 运行时版本 8.3 ML。它有 Spark 版本“3.1.1”。

编辑 2:我正在暂时考虑这个问题是否相关:https://issues.apache.org/jira/browse/SPARK-25756

为了避免加入 unanswered/followed 的大军,我会在下面记下我的初步结论,如果我了解更多,我会更新它。这可能是错误的。请不要让这阻止其他 answers/comments.

总的来说,这不是预期的行为。每个微批次被单独发送到 Pandas UDF(即,在每次触发当前微批次时,只有那个微批次被发送到 UDF)并导致结果 table 中的记录被发送尽管处于追加模式,但仍然到接收器。开发人员已注意到该问题,并且至少创建了一个 JIRA 问题来解决它。此工作线程似乎处于非活动状态。

其他数据点和建议:

  • 不同论坛(例如 Databricks)中的多个问题,以及上面链接的 JIRA 问题,直接参考,或提供 Spark 中此错误的明确示例。
  • 该问题自 2018 年以来一直存在,似乎针对版本 3.1.2 进行了修复,但 JIRA 问题已批量关闭,我看不到 discussion/work 的继续。
  • Spark Structured Streaming 目前,对于 Python 开发人员,仅支持对流聚合进行简单的数据转换(即,除了 apply 或 applyInPandas).
  • 如果您正在寻找用于非平凡应用程序的流式计算引擎,在解决此问题之前不要指望 Python Spark API 提供支持。

非常有兴趣了解潜在的解决方法,或者我是否得出了上面的错误结论。干杯。