根据每个组的 window 和 spark/scala 的时间查找上次发生的时间

Find last time occured based on a time window with spark/scala for each group

我想根据时间戳 window.last/previous 查找特定(用户和设备)登录尝试发生的时间。

For example my initial dataset looks like this:

+--------+-------+-------------------+-------+
|username| device|         attempt_at|   stat|
+--------+-------+-------------------+-------+
|   user1|     pc|2018-01-02 07:44:27| failed|
|   user1|     pc|2018-01-02 07:44:10|Success|
|   user2| iphone|2017-12-23 16:58:08|Success|
|   user2| iphone|2017-12-23 16:58:30|Success|
|   user2| iphone|2017-12-23 16:58:50| failed|
|   user1|android|2018-01-02 07:44:37| failed|
|   user1|android|2018-01-05 08:33:47| failed|
+--------+-------+-------------------+-------+

//code
val df1 = sc.parallelize(Seq(
  ("user1", "pc", "2018-01-02 07:44:27", "failed"),
  ("user1", "pc", "2018-01-02 07:44:10", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:08", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:30", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:50", "failed"),
  ("user1", "android", "2018-01-02 07:44:37", "failed"),
  ("user1", "android", "2018-01-05 08:33:47", "failed")
)).toDF("username", "device", "attempt_at", "stat")

我想要的

1 小时零 7 天 windows 我可以在其中找到每个特定用户和设备的先前尝试时间戳。基本上按用户和设备分组。

例如:对于 'user1' 和设备 'pc',对于上面的数据集,之前对 1 小时 window 和 7 天的尝试将是 '2018-01-02 07:44:27'.

但是从用户 1 的设备 'android' 来看,之前 7 天的尝试将是“2018-01-02 07:44:27”,但 1 小时 window 没有任何尝试,因为有android.

用户 1 在过去 1 小时内没有尝试

预期输出数据集

// 1 hr window for last known attempt
+--------+-------+---------------------+--------------------+
|username| device|           attempt_at| previous_attempt_at|
+--------+-------+---------------------+--------------------+
|   user1|     pc|  2018-01-02 07:44:10| 2018-01-02 07:44:27|
|   user2| iphone|  2017-12-23 16:58:50| 2017-12-23 16:58:30|
+--------+-------+---------------------+--------------------+

// 7 days window for last known attempt
+--------+--------+---------------------+--------------------+
|username| device |           attempt_at| previous_attempt_at|
+--------+--------+---------------------+--------------------+
|   user1|     pc |  2018-01-02 07:44:10| 2018-01-02 07:44:27|
|   user1| android|  2018-01-05 08:33:47| 2018-01-02 07:44:37|
|   user2|  iphone|  2017-12-23 16:58:50| 2017-12-23 16:58:30|
+--------+--------+---------------------+--------------------+

我试过的:

我尝试使用 window 超过 1 小时 windows 使用 'last'。它根据 window.

给出当前行的时间戳,而不是之前的行
val w = (Window.partitionBy("username", "device")
                 .orderBy(col("attempt_at").cast("timestamp").cast("long"))
                   .rangeBetween(-3600, 0)
                 )

val df2 = df1.withColumn("previous_attempt_at", last("attempt_at").over(w))

.rangeBetween(-3600, 0) 替换为 .rangeBetween(-3600, -1)

0 是 CURRENT ROW 所以它总是最后一个。