如何为 Spark Structured Streaming 中的每个微批次写入记录生成时间戳?
How to generate a timestamp for each microbatch of written records in Spark Structured Streaming?
我正在从 Kinesis 读取数据并通过 Spark 结构化流将其写入 ElasticEearch。我需要将每个微批次写入 ElasticSearch 索引的时间戳存储为每个记录中字段的一部分。
例如,流中的第一个微批包含 10K 条记录,这 10K 条记录的时间戳应反映它们被处理(或写入 ElasticSearch)的时刻。然后我们应该在处理第二个微批次时有一个新的时间戳,依此类推。
我尝试使用 current_timestamp 函数添加一个新列:
.withColumn("recordDate", current_timestamp())
但看起来该函数在整个查询生命周期中只被评估一次。因此,所有存储的记录都将具有相同的时间戳,指示查询开始的时间。所以这个时间戳似乎代表 "query start datetime" 而不是代表 "record datetime".
的期望时间戳
如果有人能帮助解释这是如何实现的,那就太好了。
非常感谢
您可以使用如下所示的 udf 执行此操作,您也可以添加自己的格式,
import org.apache.spark.sql.functions.udf
def current_time = udf(() => {
java.time.LocalDateTime.now().toString
})
要使用它,
val streamingDF = ???
val streamingDFWithTime .withColumn("time", current_time()))
streamingDFWithTime.writeStream
...
...
PS:我使用 udf 代替内置 current_timestamp
,因为直接在流上使用它会导致讨论的问题 here and here
希望对您有所帮助。
我正在从 Kinesis 读取数据并通过 Spark 结构化流将其写入 ElasticEearch。我需要将每个微批次写入 ElasticSearch 索引的时间戳存储为每个记录中字段的一部分。
例如,流中的第一个微批包含 10K 条记录,这 10K 条记录的时间戳应反映它们被处理(或写入 ElasticSearch)的时刻。然后我们应该在处理第二个微批次时有一个新的时间戳,依此类推。
我尝试使用 current_timestamp 函数添加一个新列:
.withColumn("recordDate", current_timestamp())
但看起来该函数在整个查询生命周期中只被评估一次。因此,所有存储的记录都将具有相同的时间戳,指示查询开始的时间。所以这个时间戳似乎代表 "query start datetime" 而不是代表 "record datetime".
的期望时间戳如果有人能帮助解释这是如何实现的,那就太好了。
非常感谢
您可以使用如下所示的 udf 执行此操作,您也可以添加自己的格式,
import org.apache.spark.sql.functions.udf
def current_time = udf(() => {
java.time.LocalDateTime.now().toString
})
要使用它,
val streamingDF = ???
val streamingDFWithTime .withColumn("time", current_time()))
streamingDFWithTime.writeStream
...
...
PS:我使用 udf 代替内置 current_timestamp
,因为直接在流上使用它会导致讨论的问题 here and here
希望对您有所帮助。