Spark 无法识别提供给 withWatermark() 的事件时间列

Spark can't identify the event time column being supplied to withWatermark()

我试图找到一种方法来使用 Spark 根据事件时间戳重新会话化不同事件中的功能,并且我找到了一个代码示例,该代码示例使用 mapGroupsWithState 在其回购中使用处理时间戳来重新会话化事件。

https://github.com/apache/spark/blob/v2.3.0/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala

为了快速测试这个会话化事物是否适用于事件时间戳,我添加了 withWatermark("timestamp", "10 seconds")(将处理时间视为事件时间戳)并将 ProcessingTimeTimeout 更改为 EventTimeTimeout.

  val lines = spark.readStream
  .format("socket")
  .option("host", host)
  .option("port", port)
  .option("includeTimestamp", value = true)
  .load()

 // Split the lines into words, treat words as sessionId of events
 val events = lines
  .withWatermark("timestamp", "10 seconds") // added
  .as[(String, Timestamp)]
  .flatMap { case (line, timestamp) =>
    line.split(" ").map(word => Event(sessionId = word, timestamp))
  }

 val sessionUpdates = events
  .groupByKey(event => event.sessionId)
  .mapGroupsWithState[SessionInfo, SessionUpdate].(GroupStateTimeout.EventTimeTimeout) {
   ...
  }

  // Start running the query that prints the session updates to the console
 val query = sessionUpdates
  .writeStream
  .outputMode("update")
  .format("console")
  .start()

 query.awaitTermination()

然而,当我 运行 它时,Spark 抛出 org.apache.spark.sql.AnalysisException 并说

Watermark must be specified in the query using '[Dataset/DataFrame].withWatermark()' for using event-time timeout in a [map|flatMap]GroupsWithState. Event-time timeout not supported without watermark`

这不是真的并且令人困惑,因为 'timestamp' 列显然在物理计划中跟随该异常消息:

...
+- EventTimeWatermark timestamp#3: timestamp, interval 10 seconds
   +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@394a6d2b,socket,List(),..., [value#2, timestamp#3]

我是漏掉了什么还是做错了什么?

提前致谢!

平面图操作后添加水印。 这应该有效:

val events = lines
  .as[(String, Timestamp)]
  .flatMap { case (line, timestamp) =>
    line.split(" ").map(word => Event(sessionId = word, timestamp))
  }.withWatermark("timestamp", "10 seconds") 

 val sessionUpdates = events
  .groupByKey(event => event.sessionId)
  .mapGroupsWithState[SessionInfo, SessionUpdate].(GroupStateTimeout.EventTimeTimeout) {
   ...
  }