滞后函数尊重数据的排序方式,但会跨组,这是我不想要的:我一定是误解了 window 和滞后函数的工作方式

A lag function honors how data are sorted, but crosses a group, and this I do not want: I must be mistunderstanding how window and lag functions work

我正在进行 covid 数据提取,我想比较每个法国部门(deplib_dep:代码和名称),每天住院的人数(hosp) 5 天前已知的病例数。

为此,我 运行 这个脚本来自一个名为 synthese:

的变量中的数据集
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.WindowSpec

val w:WindowSpec = Window.orderBy("dep", "date");
var cas_relatif_hospitalisation = lag("cas", 5).over(w);

synthese = synthese.withColumn("cas_relatif_hospitalisation",
    cas_relatif_hospitalisation)

synthese.select("date", "lib_dep", "populationDepartementale", "tx_incid", "cas", 
   "cas_relatif_hospitalisation", "hosp", "rea").show(1000, true)

这是我收到的部分结果:

date lib_dep populationDepartementale tx_incid cas cas_relatif_hospitalisation hosp rea
2022-01-02 Ain 647634 1578.03806957859 10220.0 6809.0 106 15
2022-01-03 Ain 647634 1879.58079320501 12173.0 7852.0 109 15
2022-01-04 Ain 647634 2104.10149858057 13627.0 8977.0 97 10
2022-01-05 Ain 647634 2241.24940064388 14515.0 9864.0 112 10
2022-01-06 Ain 647634 2332.12320478572 15104.0 9935.0 118 11
2022-01-07 Ain 647634 2473.83762967022 16021.0 10220.0 116 11
2022-01-08 Ain 647634 2696.83616077205 17466.0 12173.0 112 13
2022-01-09 Ain 647634 2743.11025869352 17765.0 13627.0 112 13
2022-01-10 Ain 647634 2800.34401138586 18136.0 14515.0 122 13
2022-01-11 Ain 647634 null null 15104.0 139 13
2022-01-12 Ain 647634 null null 16021.0 147 13
2022-01-13 Ain 647634 null null 17466.0 143 11
2020-03-18 Aisne 533316 null null 17765.0 41 10
2020-03-19 Aisne (!!!) 533316 null null 18136.0 (!!!) 43 15
2020-03-20 Aisne 533316 null null null 52 19
2020-03-21 Aisne 533316 null null null 61 20
2020-03-22 Aisne 533316 null null null 69 21

当我看到 Ain 部门 2022-01-13 (YMD) 的 cas_relatif_hospitalisationcas 的 D-5 值)时,它具有我愿意的价值:2022 年 1 月 8 日 cas 列的内容。

但是当我看到紧随其后的 cas_relatif_hospitalisation 值时,因为该部门现在已经更改为 Aisne,我注意到 D-5 值(Aisne, 2020-03-19) 是 (Ain, 2022-01-10) 之一。这不是我所期待的...

是的,spark 倒退了五个记录。循序渐进
但对我来说,部门是一个边界,一个 lag 函数不应该跨越的边界,我期待一个空值。但确实如此。

我哪里做错了,或者对 lagwindow 函数的用法有误解?

@blackbishop 在评论中发现

我使用的是 non-partitioned window。 我需要按部门分区:Window.partitionBy("lib_dep").orderBy("date")