根据条件时间戳获取数据框中的最新记录
Get latest records in a data frame based on time stamp with condition
我的问题标题可能不准确,但我希望我能够解释我的问题
所以我有一个如下所示的数据框
DataPartition_1|^|PartitionYear_1|^|TimeStamp|^|OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber_1|^|FFAction_1
SelfSourcedPublic|^|2001|^|1510044629598|^|4295858941|^|5|^|21|^|2|^|I|!|
SelfSourcedPublic|^|2002|^|1510044629599|^|4295858941|^|1|^|22|^|2|^|I|!|
SelfSourcedPublic|^|2002|^|1510044629600|^|4295858941|^|1|^|23|^|2|^|I|!|
SelfSourcedPublic|^|2016|^|1510044629601|^|4295858941|^|35|^|36|^|1|^|I|!|
SelfSourcedPublic|^|2016|^|1510044629624|^|4295858941|^|null|^|35|^|null|^|D|!|
SelfSourcedPublic|^|2016|^|1510044629625|^|4295858941|^|null|^|36|^|null|^|D|!|
SelfSourcedPublic|^|2016|^|1510044629626|^|4295858941|^|null|^|37|^|null|^|D|!|
SelfSourcedPublic|^|2001|^|1510044629596|^|4295858941|^|19|^|5|^|1|^|I|!|
SelfSourcedPublic|^|2001|^|1510044629597|^|4295858941|^|20|^|5|^|2|^|I|!|
SelfSourcedPublic|^|2001|^|1510044629598|^|4295858941|^|21|^|5|^|2|^|I|!|
所以我的数据框的主键是
OrganizationId", "AnnualPeriodId","InterimPeriodId"
下面是我根据时间戳获取最新记录并按主键序列排列的代码。
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("OrganizationId", "AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
现在我的问题是有时我在一些主键列中得到空值,比如带有时间戳的记录 1510044629624
。
现在我的要求是下面的记录具有相同的主键,除了第一个有 null 。在这种情况下,我仍然只需要一个具有最新时间戳的记录
SelfSourcedPublic|^|2016|^|1510044629601|^|4295858941|^|35|^|36|^|1|^|I|!|
SelfSourcedPublic|^|2016|^|1510044629625|^|4295858941|^|null|^|36|^|null|^|D|!|
我应该得到 SelfSourcedPublic|^|2016|^|1510044629625|^|4295858941|^|null|^|36|^|null|^|D|!|
由于 null ..
,我当前的代码给出了包含两条记录的输出
我希望我的问题很清楚。
据我从你的问题中了解到,你正在使用一个额外的列作为主键。
AnnualPeriodId
列正在变得 null
并且由于您在 partitionBy
中使用该字段,它导致 null 成为单独的组并因此分开 row
val windowSpec = Window.partitionBy("OrganizationId", "AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
所以解决方法是把它从partitionBy
中去掉,这样上面那行就变成了
val windowSpec = Window.partitionBy("OrganizationId", "InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
我希望这能解决您遇到的问题。
我的问题标题可能不准确,但我希望我能够解释我的问题 所以我有一个如下所示的数据框
DataPartition_1|^|PartitionYear_1|^|TimeStamp|^|OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber_1|^|FFAction_1
SelfSourcedPublic|^|2001|^|1510044629598|^|4295858941|^|5|^|21|^|2|^|I|!|
SelfSourcedPublic|^|2002|^|1510044629599|^|4295858941|^|1|^|22|^|2|^|I|!|
SelfSourcedPublic|^|2002|^|1510044629600|^|4295858941|^|1|^|23|^|2|^|I|!|
SelfSourcedPublic|^|2016|^|1510044629601|^|4295858941|^|35|^|36|^|1|^|I|!|
SelfSourcedPublic|^|2016|^|1510044629624|^|4295858941|^|null|^|35|^|null|^|D|!|
SelfSourcedPublic|^|2016|^|1510044629625|^|4295858941|^|null|^|36|^|null|^|D|!|
SelfSourcedPublic|^|2016|^|1510044629626|^|4295858941|^|null|^|37|^|null|^|D|!|
SelfSourcedPublic|^|2001|^|1510044629596|^|4295858941|^|19|^|5|^|1|^|I|!|
SelfSourcedPublic|^|2001|^|1510044629597|^|4295858941|^|20|^|5|^|2|^|I|!|
SelfSourcedPublic|^|2001|^|1510044629598|^|4295858941|^|21|^|5|^|2|^|I|!|
所以我的数据框的主键是
OrganizationId", "AnnualPeriodId","InterimPeriodId"
下面是我根据时间戳获取最新记录并按主键序列排列的代码。
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("OrganizationId", "AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
现在我的问题是有时我在一些主键列中得到空值,比如带有时间戳的记录 1510044629624
。
现在我的要求是下面的记录具有相同的主键,除了第一个有 null 。在这种情况下,我仍然只需要一个具有最新时间戳的记录
SelfSourcedPublic|^|2016|^|1510044629601|^|4295858941|^|35|^|36|^|1|^|I|!|
SelfSourcedPublic|^|2016|^|1510044629625|^|4295858941|^|null|^|36|^|null|^|D|!|
我应该得到 SelfSourcedPublic|^|2016|^|1510044629625|^|4295858941|^|null|^|36|^|null|^|D|!|
由于 null ..
,我当前的代码给出了包含两条记录的输出我希望我的问题很清楚。
据我从你的问题中了解到,你正在使用一个额外的列作为主键。
AnnualPeriodId
列正在变得 null
并且由于您在 partitionBy
中使用该字段,它导致 null 成为单独的组并因此分开 row
val windowSpec = Window.partitionBy("OrganizationId", "AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
所以解决方法是把它从partitionBy
中去掉,这样上面那行就变成了
val windowSpec = Window.partitionBy("OrganizationId", "InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
我希望这能解决您遇到的问题。