Spark 无法识别提供给 withWatermark() 的事件时间列
Spark can't identify the event time column being supplied to withWatermark()
我试图找到一种方法来使用 Spark 根据事件时间戳重新会话化不同事件中的功能,并且我找到了一个代码示例,该代码示例使用 mapGroupsWithState
在其回购中使用处理时间戳来重新会话化事件。
为了快速测试这个会话化事物是否适用于事件时间戳,我添加了 withWatermark("timestamp", "10 seconds")(将处理时间视为事件时间戳)并将 ProcessingTimeTimeout
更改为 EventTimeTimeout
.
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", value = true)
.load()
// Split the lines into words, treat words as sessionId of events
val events = lines
.withWatermark("timestamp", "10 seconds") // added
.as[(String, Timestamp)]
.flatMap { case (line, timestamp) =>
line.split(" ").map(word => Event(sessionId = word, timestamp))
}
val sessionUpdates = events
.groupByKey(event => event.sessionId)
.mapGroupsWithState[SessionInfo, SessionUpdate].(GroupStateTimeout.EventTimeTimeout) {
...
}
// Start running the query that prints the session updates to the console
val query = sessionUpdates
.writeStream
.outputMode("update")
.format("console")
.start()
query.awaitTermination()
然而,当我 运行 它时,Spark 抛出 org.apache.spark.sql.AnalysisException
并说
Watermark must be specified in the query using '[Dataset/DataFrame].withWatermark()' for using event-time timeout in a [map|flatMap]GroupsWithState. Event-time timeout not supported without watermark`
这不是真的并且令人困惑,因为 'timestamp' 列显然在物理计划中跟随该异常消息:
...
+- EventTimeWatermark timestamp#3: timestamp, interval 10 seconds
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@394a6d2b,socket,List(),..., [value#2, timestamp#3]
我是漏掉了什么还是做错了什么?
提前致谢!
平面图操作后添加水印。
这应该有效:
val events = lines
.as[(String, Timestamp)]
.flatMap { case (line, timestamp) =>
line.split(" ").map(word => Event(sessionId = word, timestamp))
}.withWatermark("timestamp", "10 seconds")
val sessionUpdates = events
.groupByKey(event => event.sessionId)
.mapGroupsWithState[SessionInfo, SessionUpdate].(GroupStateTimeout.EventTimeTimeout) {
...
}
我试图找到一种方法来使用 Spark 根据事件时间戳重新会话化不同事件中的功能,并且我找到了一个代码示例,该代码示例使用 mapGroupsWithState
在其回购中使用处理时间戳来重新会话化事件。
为了快速测试这个会话化事物是否适用于事件时间戳,我添加了 withWatermark("timestamp", "10 seconds")(将处理时间视为事件时间戳)并将 ProcessingTimeTimeout
更改为 EventTimeTimeout
.
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", value = true)
.load()
// Split the lines into words, treat words as sessionId of events
val events = lines
.withWatermark("timestamp", "10 seconds") // added
.as[(String, Timestamp)]
.flatMap { case (line, timestamp) =>
line.split(" ").map(word => Event(sessionId = word, timestamp))
}
val sessionUpdates = events
.groupByKey(event => event.sessionId)
.mapGroupsWithState[SessionInfo, SessionUpdate].(GroupStateTimeout.EventTimeTimeout) {
...
}
// Start running the query that prints the session updates to the console
val query = sessionUpdates
.writeStream
.outputMode("update")
.format("console")
.start()
query.awaitTermination()
然而,当我 运行 它时,Spark 抛出 org.apache.spark.sql.AnalysisException
并说
Watermark must be specified in the query using '[Dataset/DataFrame].withWatermark()' for using event-time timeout in a [map|flatMap]GroupsWithState. Event-time timeout not supported without watermark`
这不是真的并且令人困惑,因为 'timestamp' 列显然在物理计划中跟随该异常消息:
...
+- EventTimeWatermark timestamp#3: timestamp, interval 10 seconds
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@394a6d2b,socket,List(),..., [value#2, timestamp#3]
我是漏掉了什么还是做错了什么?
提前致谢!
平面图操作后添加水印。 这应该有效:
val events = lines
.as[(String, Timestamp)]
.flatMap { case (line, timestamp) =>
line.split(" ").map(word => Event(sessionId = word, timestamp))
}.withWatermark("timestamp", "10 seconds")
val sessionUpdates = events
.groupByKey(event => event.sessionId)
.mapGroupsWithState[SessionInfo, SessionUpdate].(GroupStateTimeout.EventTimeTimeout) {
...
}