我们如何从结构化流中获得小批量时间
How can we get mini-batch time from Structured Streaming
在 Spark 流中,有带有时间参数的 forEachRDD,可以在其中花费该时间并将其用于不同目的 - 元数据,在 rdd 中创建额外的时间列,...
val stream = KafkaUtils.createDirectStream(...)
stream.foreachRDD { (rdd, time) =>
// update metadata with time
// convert rdd to df and add time column
// write df
}
在结构化流中 API
val df: Dataset[Row] = spark
.readStream
.format("kafka")
.load()
df.writeStream.trigger(...)
.outputMode(...)
.start()
如何才能为结构化流获取类似的 time(小批量时间)数据,以便能够以相同的方式使用它?
我已经搜索了一个函数,它提供了获取 batchTime 的可能性,但在 Spark Structured Streaming API 中似乎还不存在。
这是我用来获取批处理时间(假设批处理间隔为 2000 毫秒)的解决方法,它使用 foreachBatch
允许我们获取 batchId:
val now = java.time.Instant.now
val batchInterval = 2000
df.writeStream.trigger(Trigger.ProcessingTime(batchInterval))
.foreachBatch({ (batchDF: DataFrame, batchId: Long) =>
println(now.plusMillis(batchId * batchInterval.milliseconds))
})
.outputMode(...)
.start()
这是输出:
2019-07-29T17:13:19.880Z
2019-07-29T17:13:21.880Z
2019-07-29T17:13:23.880Z
2019-07-29T17:13:25.880Z
2019-07-29T17:13:27.880Z
2019-07-29T17:13:29.880Z
2019-07-29T17:13:31.880Z
2019-07-29T17:13:33.880Z
2019-07-29T17:13:35.880Z
希望对您有所帮助!
在 Spark 流中,有带有时间参数的 forEachRDD,可以在其中花费该时间并将其用于不同目的 - 元数据,在 rdd 中创建额外的时间列,...
val stream = KafkaUtils.createDirectStream(...)
stream.foreachRDD { (rdd, time) =>
// update metadata with time
// convert rdd to df and add time column
// write df
}
在结构化流中 API
val df: Dataset[Row] = spark
.readStream
.format("kafka")
.load()
df.writeStream.trigger(...)
.outputMode(...)
.start()
如何才能为结构化流获取类似的 time(小批量时间)数据,以便能够以相同的方式使用它?
我已经搜索了一个函数,它提供了获取 batchTime 的可能性,但在 Spark Structured Streaming API 中似乎还不存在。
这是我用来获取批处理时间(假设批处理间隔为 2000 毫秒)的解决方法,它使用 foreachBatch
允许我们获取 batchId:
val now = java.time.Instant.now
val batchInterval = 2000
df.writeStream.trigger(Trigger.ProcessingTime(batchInterval))
.foreachBatch({ (batchDF: DataFrame, batchId: Long) =>
println(now.plusMillis(batchId * batchInterval.milliseconds))
})
.outputMode(...)
.start()
这是输出:
2019-07-29T17:13:19.880Z
2019-07-29T17:13:21.880Z
2019-07-29T17:13:23.880Z
2019-07-29T17:13:25.880Z
2019-07-29T17:13:27.880Z
2019-07-29T17:13:29.880Z
2019-07-29T17:13:31.880Z
2019-07-29T17:13:33.880Z
2019-07-29T17:13:35.880Z
希望对您有所帮助!