Spark MicroBatchExecution:流式查询取得进展......真的吗?

Spark MicroBatchExecution : Streaming query made progress... Really?

我是 运行 增量流式查询,并且在 "Nothing" 真正发生时,我不断收到无数的更新和 QueryProgressEvent 拦截。

如果没有检测到要处理的行,为什么会触发这些事件?什么被认为是 "progress" ?

对我来说,这只是不需要的日志污染,我必须找到一种方法将其静音,直到 "really" 发生某些事情,但我仍然对原因和方式感到好奇。

20/01/01 23:18:21 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "bca1d3d2-4196-4e89-9dcf-916536bd00a6",
  "runId" : "2e6bfbef-cea1-48dd-b228-39f7fdc09e27",
  "name" : "STREAM_DELTA",
  "timestamp" : "2020-01-01T23:18:21.950Z",
  "batchId" : 1,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 1,
    "triggerExecution" : 1
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "FileStreamSource[file:/delta/source]",
    "startOffset" : {
      "logOffset" : 0
    },
    "endOffset" : {
      "logOffset" : 0
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "DeltaSink[/delta/output]"
  }
}

Why are those event fired if no rows were detected to be processed ?

结构化流不是事件驱动的。连续或通过微批处理的结构化流 运行s。

  • 连续:您的直播 运行 不间断。一个 运行 结束后,下一个开始。
  • 微批处理:您的流 运行 秒基于您的触发规则(例如 5 秒)。当一个流 运行 结束时,它会等待 5 秒,直到重新 运行ning.

在任何一种情况下,流都会检查其输入位置是否有任何新文件要处理。如果有新文件,它会按配置处理它们并将文件名写入其检查点,这样这些文件就不会被重新处理为新文件。如果没有新文件,它会完成 运行,因为它认为没有工作要做。这就是为什么即使没有检测到行也会触发这些事件的原因。

What is considered a "progress" ?

进展被视为成功 运行 的结论,如您发布的日志所示。该流由 运行ning 创建 "progress"。