我们如何从结构化流中获得小批量时间

How can we get mini-batch time from Structured Streaming

在 Spark 流中,有带有时间参数的 forEachRDD,可以在其中花费该时间并将其用于不同目的 - 元数据,在 rdd 中创建额外的时间列,...

val stream = KafkaUtils.createDirectStream(...)
stream.foreachRDD { (rdd, time) => 
  // update metadata with time 
  // convert rdd to df and add time column
  // write df
 }    

在结构化流中 API

val df: Dataset[Row] = spark
  .readStream
  .format("kafka")
  .load()

df.writeStream.trigger(...)
  .outputMode(...)
  .start()

如何才能为结构化流获取类似的 time(小批量时间)数据,以便能够以相同的方式使用它?

我已经搜索了一个函数,它提供了获取 batchTime 的可能性,但在 Spark Structured Streaming API 中似乎还不存在。

这是我用来获取批处理时间(假设批处理间隔为 2000 毫秒)的解决方法,它使用 foreachBatch 允许我们获取 batchId:

val now = java.time.Instant.now
val batchInterval = 2000
df.writeStream.trigger(Trigger.ProcessingTime(batchInterval))
  .foreachBatch({ (batchDF: DataFrame, batchId: Long) =>
     println(now.plusMillis(batchId * batchInterval.milliseconds))
  })
  .outputMode(...)
  .start()

这是输出:

2019-07-29T17:13:19.880Z 2019-07-29T17:13:21.880Z 2019-07-29T17:13:23.880Z 2019-07-29T17:13:25.880Z 2019-07-29T17:13:27.880Z 2019-07-29T17:13:29.880Z 2019-07-29T17:13:31.880Z 2019-07-29T17:13:33.880Z 2019-07-29T17:13:35.880Z

希望对您有所帮助!