删除关闭时间戳的条目

Dropping entries of close timestamps

我想删除所有重复条目的记录,但我说过时间戳的差异可以是任何时间量的偏移量,但为简单起见,将使用 2 分钟。

+-------------------+-----+----+
|Date               |ColA |ColB|
+-------------------+-----+----+
|2017-07-04 18:50:21|ABC  |DEF |
|2017-07-04 18:50:26|ABC  |DEF |
|2017-07-04 18:50:21|ABC  |KLM |
+-------------------+-----+----+

我希望我的数据框只有行

+-------------------+-----+----+
|Date               |ColA |ColB|
+-------------------+-----+----+
|2017-07-04 18:50:26|ABC  |DEF |
|2017-07-04 18:50:21|ABC  |KLM |
+-------------------+-----+----+

我试过类似的方法,但这并没有删除重复项。

    val joinedDfNoDuplicates = joinedDFTransformed.as("df1").join(joinedDFTransformed.as("df2"), col("df1.ColA") === col("df2.ColA") &&
      col("df1.ColB") === col("df2.ColB") && 
      && abs(unix_timestamp(col("Date")) - unix_timestamp(col("Date"))) > offset
      )

目前,我只是在此处 根据特定列的数据选择不同的或一组,但我想要一个更强大的解决方案,原因是该间隔之外的数据可能是有效数据。此外,可以更改偏移量,因此可能会在 5 秒或 5 分钟内发生变化,具体取决于要求。

有人向我提到过创建一个 UDF 来比较日期,如果所有其他列都相同,但我不确定具体该怎么做,这样我要么过滤掉行,要么添加一个标志,然后删除这些行任何帮助将不胜感激。

类似的 sql 问题在这里 Duplicate entries with different timestamp

谢谢!

我会这样做:

  1. 定义一个 Window 以在虚拟列上排序日期。
  2. 添加一个虚拟列,并向其添加一个常量值。
  3. 添加包含上一条记录日期的新列。
  4. 计算该日期与前一个日期的差值。
  5. 根据差值过滤您的记录。

代码可以如下所示:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val w = Window.partitionBy("dummy").orderBy("Date") // step 1

df.withColumn("dummy", lit(1)) // this is step 1
  .withColumn("previousDate", lag($"Date", 1) over w) // step 2
  .withColumn("difference", unix_timestamp($"Date") - unix_timestamp("previousDate")) // step 3

如果您有成对的记录可能在时间上接近,则上述解决方案有效。如果您有两条以上的记录,您可以将每条记录与 window 中的第一条记录(而不是前一条)进行比较,因此不要使用 lag($"Date",1),而是使用 first($"Date")。在这种情况下,'difference' 列包含当前记录与 window.

中第一条记录之间的时间差