Spark Structured Streaming Trigger.ProcessingTime 计时的准确性

Accuracy of timing of the Trigger.ProcessingTime for Spark Structured Streaming

我有一个用于 kafka 数据结构化流的 spark 作业。 基本代码如下

val rules_monitoring_stream = rules_imsi_df.writeStream
  .outputMode("append")
  .format("memory")
  .trigger(Trigger.ProcessingTime("120 seconds"))
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    if(!batchDF.isEmpty) {
      printf("At %d, the microbatch has %d records \n", Instant.now.getEpochSecond, batchDF.count())
      batchDF.show()
      batchDF.persist()
      // ... Processing batchDF and populate a static dataframe
      batchDF.unpersist()
    }
  }
  .start()

while(rules_monitoring_stream.isActive) {
  Thread.sleep(240000)
  // Periodically load data from database
}

基本思路是在 120 秒内流式传输 kafka 数据window,处理微批数据并填充静态数据帧。

根据我的理解,按照这种设计,微批应该每 120 秒到达一次,并且 batchDF 包含这段时间内摄取的数据 window。

但是根据我对printf语句的microbatch到达时间的监测。我发现了以下输出。

At 1594968139, the microbatch has 110 records
At 1594968242, the microbatch has 118 records
At 1594968380, the microbatch has 243 records
At 1594968483, the microbatch has 117 records
At 1594968602, the microbatch has 59 records

似乎微批次的相邻到达时间之间的增量不是很准确,为 120 秒。有时大于120s,有时小于120s

正常吗?如何理解Trigger.ProcessingTime指定的时间?如何获得更准确的时间window?

另外,由于这种不准确,是否会导致微批次的一些数据丢失?我的意思是,有些数据永远不会被任何微批次捕获?

"Is it normal? How to understand the time specified by Trigger.ProcessingTime?"

是的,这是正常的。请记住,配置的 Trigger 会触发流作业的 整个查询 而不是 单独的 foreachBatch 方法。由于您通常有不同数量的记录和不同的处理持续时间,因此您 foreachBatch 调用中的实际写入也不会是固定时间。

"How to get a more accurate time window?"

触发器工作非常准确,您可以考虑另一种测量触发时间的方法,例如在查询的最开始检查时间(在 readStream 调用之后)。

"In addition, due to this inaccuracy, will it cause some data loss for the microbatches? By this I mean, some data is never captured by any microbatches?"

没有,没有数据丢失。