删除关闭时间戳的条目
Dropping entries of close timestamps
我想删除所有重复条目的记录,但我说过时间戳的差异可以是任何时间量的偏移量,但为简单起见,将使用 2 分钟。
+-------------------+-----+----+
|Date |ColA |ColB|
+-------------------+-----+----+
|2017-07-04 18:50:21|ABC |DEF |
|2017-07-04 18:50:26|ABC |DEF |
|2017-07-04 18:50:21|ABC |KLM |
+-------------------+-----+----+
我希望我的数据框只有行
+-------------------+-----+----+
|Date |ColA |ColB|
+-------------------+-----+----+
|2017-07-04 18:50:26|ABC |DEF |
|2017-07-04 18:50:21|ABC |KLM |
+-------------------+-----+----+
我试过类似的方法,但这并没有删除重复项。
val joinedDfNoDuplicates = joinedDFTransformed.as("df1").join(joinedDFTransformed.as("df2"), col("df1.ColA") === col("df2.ColA") &&
col("df1.ColB") === col("df2.ColB") &&
&& abs(unix_timestamp(col("Date")) - unix_timestamp(col("Date"))) > offset
)
目前,我只是在此处 根据特定列的数据选择不同的或一组,但我想要一个更强大的解决方案,原因是该间隔之外的数据可能是有效数据。此外,可以更改偏移量,因此可能会在 5 秒或 5 分钟内发生变化,具体取决于要求。
有人向我提到过创建一个 UDF 来比较日期,如果所有其他列都相同,但我不确定具体该怎么做,这样我要么过滤掉行,要么添加一个标志,然后删除这些行任何帮助将不胜感激。
类似的 sql 问题在这里 Duplicate entries with different timestamp
谢谢!
我会这样做:
- 定义一个 Window 以在虚拟列上排序日期。
- 添加一个虚拟列,并向其添加一个常量值。
- 添加包含上一条记录日期的新列。
- 计算该日期与前一个日期的差值。
- 根据差值过滤您的记录。
代码可以如下所示:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val w = Window.partitionBy("dummy").orderBy("Date") // step 1
df.withColumn("dummy", lit(1)) // this is step 1
.withColumn("previousDate", lag($"Date", 1) over w) // step 2
.withColumn("difference", unix_timestamp($"Date") - unix_timestamp("previousDate")) // step 3
如果您有成对的记录可能在时间上接近,则上述解决方案有效。如果您有两条以上的记录,您可以将每条记录与 window 中的第一条记录(而不是前一条)进行比较,因此不要使用 lag($"Date",1)
,而是使用 first($"Date")
。在这种情况下,'difference' 列包含当前记录与 window.
中第一条记录之间的时间差
我想删除所有重复条目的记录,但我说过时间戳的差异可以是任何时间量的偏移量,但为简单起见,将使用 2 分钟。
+-------------------+-----+----+
|Date |ColA |ColB|
+-------------------+-----+----+
|2017-07-04 18:50:21|ABC |DEF |
|2017-07-04 18:50:26|ABC |DEF |
|2017-07-04 18:50:21|ABC |KLM |
+-------------------+-----+----+
我希望我的数据框只有行
+-------------------+-----+----+
|Date |ColA |ColB|
+-------------------+-----+----+
|2017-07-04 18:50:26|ABC |DEF |
|2017-07-04 18:50:21|ABC |KLM |
+-------------------+-----+----+
我试过类似的方法,但这并没有删除重复项。
val joinedDfNoDuplicates = joinedDFTransformed.as("df1").join(joinedDFTransformed.as("df2"), col("df1.ColA") === col("df2.ColA") &&
col("df1.ColB") === col("df2.ColB") &&
&& abs(unix_timestamp(col("Date")) - unix_timestamp(col("Date"))) > offset
)
目前,我只是在此处
有人向我提到过创建一个 UDF 来比较日期,如果所有其他列都相同,但我不确定具体该怎么做,这样我要么过滤掉行,要么添加一个标志,然后删除这些行任何帮助将不胜感激。
类似的 sql 问题在这里 Duplicate entries with different timestamp
谢谢!
我会这样做:
- 定义一个 Window 以在虚拟列上排序日期。
- 添加一个虚拟列,并向其添加一个常量值。
- 添加包含上一条记录日期的新列。
- 计算该日期与前一个日期的差值。
- 根据差值过滤您的记录。
代码可以如下所示:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val w = Window.partitionBy("dummy").orderBy("Date") // step 1
df.withColumn("dummy", lit(1)) // this is step 1
.withColumn("previousDate", lag($"Date", 1) over w) // step 2
.withColumn("difference", unix_timestamp($"Date") - unix_timestamp("previousDate")) // step 3
如果您有成对的记录可能在时间上接近,则上述解决方案有效。如果您有两条以上的记录,您可以将每条记录与 window 中的第一条记录(而不是前一条)进行比较,因此不要使用 lag($"Date",1)
,而是使用 first($"Date")
。在这种情况下,'difference' 列包含当前记录与 window.