如何为 Spark Structured Streaming 中的每个微批次写入记录生成时间戳?

How to generate a timestamp for each microbatch of written records in Spark Structured Streaming?

我正在从 Kinesis 读取数据并通过 Spark 结构化流将其写入 ElasticEearch。我需要将每个微批次写入 ElasticSearch 索引的时间戳存储为每个记录中字段的一部分。

例如,流中的第一个微批包含 10K 条记录,这 10K 条记录的时间戳应反映它们被处理(或写入 ElasticSearch)的时刻。然后我们应该在处理第二个微批次时有一个新的时间戳,依此类推。

我尝试使用 current_timestamp 函数添加一个新列:

.withColumn("recordDate", current_timestamp())

但看起来该函数在整个查询生命周期中只被评估一次。因此,所有存储的记录都将具有相同的时间戳,指示查询开始的时间。所以这个时间戳似乎代表 "query start datetime" 而不是代表 "record datetime".

的期望时间戳

如果有人能帮助解释这是如何实现的,那就太好了。

非常感谢

您可以使用如下所示的 udf 执行此操作,您也可以添加自己的格式,

import org.apache.spark.sql.functions.udf

 def current_time = udf(() => {
    java.time.LocalDateTime.now().toString
  })

要使用它,

val streamingDF = ???
val streamingDFWithTime .withColumn("time", current_time()))
streamingDFWithTime.writeStream
...
...

PS:我使用 udf 代替内置 current_timestamp,因为直接在流上使用它会导致讨论的问题 here and here

希望对您有所帮助。