SPARK,DataFrame:时间戳列与连续行的差异
SPARK, DataFrame: difference of Timestamp columns over consecutive rows
我有一个 DateFrame 如下:
+---+---------------------+---------------------+
|id |initDate |endDate |
+---+---------------------+---------------------+
|138|2016-04-15 00:00:00.0|2016-04-28 00:00:00.0|
|138|2016-05-09 00:00:00.0|2016-05-23 00:00:00.0|
|138|2016-06-04 00:00:00.0|2016-06-18 00:00:00.0|
|138|2016-06-18 00:00:00.0|2016-07-02 00:00:00.0|
|138|2016-07-09 00:00:00.0|2016-07-23 00:00:00.0|
|138|2016-07-27 00:00:00.0|2016-08-10 00:00:00.0|
|138|2016-08-18 00:00:00.0|2016-09-01 00:00:00.0|
|138|2016-09-13 00:00:00.0|2016-09-27 00:00:00.0|
|138|2016-10-04 00:00:00.0|null |
+---+---------------------+---------------------+
行按 id
然后 initDate
列升序排列。
initDate
和 endDate
列都具有时间戳类型。为了便于说明,我只显示了属于一个 id
值的记录。
我的目标是添加一个新列,为每个 id
显示 每行的 initDate
与 [=15] 之间的差异(天数) =] 上一行.
如果没有上一行,则该值为 -1。
输出应如下所示:
+---+---------------------+---------------------+----------+
|id |initDate |endDate |difference|
+---+---------------------+---------------------+----------+
|138|2016-04-15 00:00:00.0|2016-04-28 00:00:00.0|-1 |
|138|2016-05-09 00:00:00.0|2016-05-23 00:00:00.0|11 |
|138|2016-06-04 00:00:00.0|2016-06-18 00:00:00.0|12 |
|138|2016-06-18 00:00:00.0|2016-07-02 00:00:00.0|0 |
|138|2016-07-09 00:00:00.0|2016-07-23 00:00:00.0|7 |
|138|2016-07-27 00:00:00.0|2016-08-10 00:00:00.0|4 |
|138|2016-08-18 00:00:00.0|2016-09-01 00:00:00.0|8 |
|138|2016-09-13 00:00:00.0|2016-09-27 00:00:00.0|12 |
|138|2016-10-04 00:00:00.0|null |7 |
+---+---------------------+---------------------+----------+
我正在考虑使用 window 函数按 id
对记录进行分区,但我不知道如何执行后续步骤。
尝试:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val w = Window.partitionBy("id").orderBy("endDate")
df.withColumn("difference", date_sub($"initDate", lag($"endDate", 1).over(w)))
感谢@lostInOverflow 的指点,我想到了以下解决方案:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val w = Window.partitionBy("id").orderBy("initDate")
val previousEnd = lag($"endDate", 1).over(w)
filteredDF.withColumn("prev", previousEnd)
.withColumn("difference", datediff($"initDate", $"prev"))
只是对以前好的答案的补充,以防有人想尝试使用 spark sql 或 Hive。
select tab.tran_id,tab.init_date,tab.end_date,coalesce(tab.day_diff,-1)
as day_diffrence from
(select *,datediff(day,lag(end_date,1) over(partition by tran_id order by init_date)
,init_date) as day_diff from your_table) tab
;
我有一个 DateFrame 如下:
+---+---------------------+---------------------+
|id |initDate |endDate |
+---+---------------------+---------------------+
|138|2016-04-15 00:00:00.0|2016-04-28 00:00:00.0|
|138|2016-05-09 00:00:00.0|2016-05-23 00:00:00.0|
|138|2016-06-04 00:00:00.0|2016-06-18 00:00:00.0|
|138|2016-06-18 00:00:00.0|2016-07-02 00:00:00.0|
|138|2016-07-09 00:00:00.0|2016-07-23 00:00:00.0|
|138|2016-07-27 00:00:00.0|2016-08-10 00:00:00.0|
|138|2016-08-18 00:00:00.0|2016-09-01 00:00:00.0|
|138|2016-09-13 00:00:00.0|2016-09-27 00:00:00.0|
|138|2016-10-04 00:00:00.0|null |
+---+---------------------+---------------------+
行按 id
然后 initDate
列升序排列。
initDate
和 endDate
列都具有时间戳类型。为了便于说明,我只显示了属于一个 id
值的记录。
我的目标是添加一个新列,为每个 id
显示 每行的 initDate
与 [=15] 之间的差异(天数) =] 上一行.
如果没有上一行,则该值为 -1。
输出应如下所示:
+---+---------------------+---------------------+----------+
|id |initDate |endDate |difference|
+---+---------------------+---------------------+----------+
|138|2016-04-15 00:00:00.0|2016-04-28 00:00:00.0|-1 |
|138|2016-05-09 00:00:00.0|2016-05-23 00:00:00.0|11 |
|138|2016-06-04 00:00:00.0|2016-06-18 00:00:00.0|12 |
|138|2016-06-18 00:00:00.0|2016-07-02 00:00:00.0|0 |
|138|2016-07-09 00:00:00.0|2016-07-23 00:00:00.0|7 |
|138|2016-07-27 00:00:00.0|2016-08-10 00:00:00.0|4 |
|138|2016-08-18 00:00:00.0|2016-09-01 00:00:00.0|8 |
|138|2016-09-13 00:00:00.0|2016-09-27 00:00:00.0|12 |
|138|2016-10-04 00:00:00.0|null |7 |
+---+---------------------+---------------------+----------+
我正在考虑使用 window 函数按 id
对记录进行分区,但我不知道如何执行后续步骤。
尝试:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val w = Window.partitionBy("id").orderBy("endDate")
df.withColumn("difference", date_sub($"initDate", lag($"endDate", 1).over(w)))
感谢@lostInOverflow 的指点,我想到了以下解决方案:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val w = Window.partitionBy("id").orderBy("initDate")
val previousEnd = lag($"endDate", 1).over(w)
filteredDF.withColumn("prev", previousEnd)
.withColumn("difference", datediff($"initDate", $"prev"))
只是对以前好的答案的补充,以防有人想尝试使用 spark sql 或 Hive。
select tab.tran_id,tab.init_date,tab.end_date,coalesce(tab.day_diff,-1)
as day_diffrence from
(select *,datediff(day,lag(end_date,1) over(partition by tran_id order by init_date)
,init_date) as day_diff from your_table) tab
;