按键分组并使用 Spark/Scala 有效地找到在特定时间 window 发生的事件的先前时间戳

Group by key and find the previous timestamp of an event that occured in a specific time window efficiently with Spark/Scala

注意:我的分组最多可以包含每组 5-10K 行以进行聚合。因此非常需要一个高效的代码。

我的数据

val df1 = sc.parallelize(Seq(
  ("user2", "iphone", "2017-12-23 16:58:08", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:12", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:20", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:25", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:35", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:45", "Success")
)).toDF("username", "device", "attempt_at", "stat")
+--------+------+-------------------+-------+
|username|device|         attempt_at|   stat|
+--------+------+-------------------+-------+
|   user2|iphone|2017-12-23 16:58:08|Success|
|   user2|iphone|2017-12-23 16:58:12|Success|
|   user2|iphone|2017-12-23 16:58:20|Success|
|   user2|iphone|2017-12-23 16:58:25|Success|
|   user2|iphone|2017-12-23 16:58:35|Success|
|   user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+

我想要的
事件发生的最近时间按(用户名、设备)分组。

+--------+------+-------------------+-------+-------------------+
|username|device|         attempt_at|   stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
|   user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+

所需输出中的异常:
现在因为我提到它必须在特定时间 window 例如在下面的输入数据集中最后一行有 12 月 23 日的最新日期时间戳。现在,如果我想要返回 1 天的特定时间 window 并给我最后一次尝试,则 'previous_attempt_at' 列将为空,因为没有事件前一天应该是 1 月 22 日。这完全取决于输入时间戳范围。

//Initial Data
+--------+------+-------------------+-------+
|username|device|         attempt_at|   stat|
+--------+------+-------------------+-------+
|   user2|iphone|2017-12-20 16:58:08|Success|
|   user2|iphone|2017-12-20 16:58:12|Success|
|   user2|iphone|2017-12-20 16:58:20|Success|
|   user2|iphone|2017-12-20 16:58:25|Success|
|   user2|iphone|2017-12-20 16:58:35|Success|
|   user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+

// Desired Output
A grouping by (username,device) for the latest time an event occurred.

    +--------+------+-------------------+-------+-------------------+
    |username|device|         attempt_at|   stat|previous_attempt_at|
    +--------+------+-------------------+-------+-------------------+
    |   user2|iphone|2017-12-23 16:58:45|Success|               null|
    +--------+------+-------------------+-------+-------------------+

我有什么

val w = (Window.partitionBy("username", "device")
                 .orderBy(col("attempt_at").cast("timestamp").cast("long"))
                   .rangeBetween(-3600, -1)
                 )

val df2 = df1.withColumn("previous_attempt_at", last("attempt_at").over(w))

+--------+------+-------------------+-------+-------------------+
|username|device|         attempt_at|   stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
|   user2|iphone|2017-12-23 16:58:08|Success|               null|
|   user2|iphone|2017-12-23 16:58:12|Success|2017-12-23 16:58:08|
|   user2|iphone|2017-12-23 16:58:20|Success|2017-12-23 16:58:12|
|   user2|iphone|2017-12-23 16:58:25|Success|2017-12-23 16:58:20|
|   user2|iphone|2017-12-23 16:58:35|Success|2017-12-23 16:58:25|
|   user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+

注释。 我的代码对特定用户分组中的每一行进行 windowing。 在处理大量数据时效率非常低,也没有给出最新的尝试。除了最后一行,我不需要所有行。

您只需要一个额外的 groupByaggregation 但在此之前,您需要 collect_list 函数来累积收集以前的日期和 udf函数检查前面的attempt_at在时间限制内将三列"attempt_at", "stat", "previous_attempt_at")转换为struct 用于选择最后一个作为

import org.apache.spark.sql.functions._
import java.time._
import java.time.temporal._
import java.time.format._
def durationUdf = udf((actualtimestamp: String, timestamps: Seq[String])=> {
  val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
  val actualDateTime = LocalDateTime.parse(actualtimestamp, formatter)
  val diffDates = timestamps.init.filter(x => LocalDateTime.from(LocalDateTime.parse(x, formatter)).until(actualDateTime, ChronoUnit.DAYS) <= 1)
  if(diffDates.size > 0) diffDates.last else null
})

import org.apache.spark.sql.expressions._
val w = Window.partitionBy("username", "device").orderBy(col("attempt_at").cast("timestamp").cast("long"))

val df2 = df1.withColumn("previous_attempt_at", durationUdf(col("attempt_at"), collect_list("attempt_at").over(w)))
  .withColumn("struct", struct(col("attempt_at").cast("timeStamp").as("attempt_at"),col("stat"), col("previous_attempt_at")))
  .groupBy("username", "device").agg(max("struct").as("struct"))
  .select(col("username"), col("device"), col("struct.attempt_at"), col("struct.stat"), col("struct.previous_attempt_at"))

这应该给你后面的例子

+--------+------+---------------------+-------+-------------------+
|username|device|attempt_at           |stat   |previous_attempt_at|
+--------+------+---------------------+-------+-------------------+
|user2   |iphone|2017-12-23 16:58:45.0|Success|null               |
+--------+------+---------------------+-------+-------------------+

和后面的对于前面输入的data

+--------+------+---------------------+-------+-------------------+
|username|device|attempt_at           |stat   |previous_attempt_at|
+--------+------+---------------------+-------+-------------------+
|user2   |iphone|2017-12-23 16:58:45.0|Success|2017-12-23 16:58:35|
+--------+------+---------------------+-------+-------------------+

并且您可以通过将 udf 函数中的 ChronoUnit.DAYS 更改为 ChronoUnit.HOURS 等来更改小时的逻辑