默认(未指定)触发器如何确定结构化流中微批次的大小?

How does the default (unspecified) trigger determine the size of micro-batches in Structured Streaming?

当Spark Structured Streaming中的query execution没有设置trigger时,

import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  //.trigger(???) // <--- Trigger intentionally omitted ----
  .start()

截至 Spark 2.4.3(2019 年 8 月)。 Structured Streaming Programming Guide - Triggers 表示

If no trigger setting is explicitly specified, then by default, the query will be executed in micro-batch mode, where micro-batches will be generated as soon as the previous micro-batch has completed processing.

问题:默认触发器根据什么决定微批次的大小?

说吧。输入源是 Kafka。由于一些中断,工作中断了一天。然后重新启动同一个 Spark 作业。然后它将在它停止的地方使用消息。这是否意味着第一个微批次将是一个巨大的批次,其中有 1 天的消息在作业停止时累积在 Kafka 主题中?假设作业需要 10 小时来处理那个大批量,那么下一个微批处理有 10 小时的消息?并逐渐直到 X 次迭代以赶上积压以达到更小的微批次。

On which basis the default trigger determines the size of the micro-batches?

没有。每个触发器(无论多长)都只是请求输入数据集的所有来源,并且它们提供的任何内容都由操作员在下游处理。消息来源知道该提供什么,因为他们知道到目前为止已经消耗(处理)了什么。

就好像您询问了批处理结构化查询以及这个 "trigger" 请求处理的数据大小(顺便说一句,有 ProcessingTime.Once 触发器)。

Does that mean the first micro-batch will be a gigantic batch with 1 day of msg which accumulated in the Kafka topic while the job was stopped?

几乎(与 Spark Structured Streaming 几乎没有任何关系)。

底层 Kafka 消费者获取处理的记录数由 max.poll.records 和可能由一些其他配置属性配置(参见 )。

由于 Spark Structured Streaming 使用 Kafka 数据源,它只是 Kafka Consumer API 的包装器 API 单个微批处理中发生的任何事情都等同于此单个 Consumer.poll 调用。

您可以使用带有 kafka. 前缀(例如 kafka.bootstrap.servers)的选项配置底层 Kafka 消费者,这些选项被考虑用于驱动程序和执行程序上的 Kafka 消费者。