在结构化流式检查点中,为什么在 foreachbatch 之后不提交偏移量

In structured streaming checkpointing why offsets are not committed after foreachbatch

df
.writeStream
.trigger(Trigger.Once)
.option(checkpointKey, checkpointVal)
.foreachBatch { (batchDF: DataFrame, batchId: Long) => }

这是我运行的示例代码。 观察到结构化流在开头本身创建了偏移量文件:checkpoints/offsets/3

为什么不等待 foreachBatch 完成,然后将偏移量写入检查点目录?

每个微批类似于一个交易。

  1. 当kafka源有新消息时。的开始和结束偏移量 这个 microbatch 将被写入 offsets 文件夹。

  2. 开始处理。

  3. 如果成功,commit文件将被写入commits文件夹 微浴 ID。如果失败,将使用相同的重新执行微批处理 偏移量范围。