如何对时间戳值使用 lag 和 rangeBetween 函数?

How to use lag and rangeBetween functions on timestamp values?

我有这样的数据:

userid,eventtime,location_point
4e191908,2017-06-04 03:00:00,18685891
4e191908,2017-06-04 03:04:00,18685891
3136afcb,2017-06-04 03:03:00,18382821
661212dd,2017-06-04 03:06:00,80831484
40e8a7c3,2017-06-04 03:12:00,18825769

我想添加一个新的布尔值列,如果同一 location_point 在 5 分钟内 window 有 2 个或更多 userid,则标记为真。我有一个想法,即使用 lag 函数来查找由 userid 分区的 window 以及当前时间戳和接下来 5 分钟之间的范围:

from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col

days = lambda i: i * 60*5 

windowSpec = W.partitionBy(col("userid")).orderBy(col("eventtime").cast("timestamp").cast("long")).rangeBetween(0, days(5))

lastURN = F.lag(col("location_point"), 1).over(windowSpec)
visitCheck = (last_location_point == output.location_pont)
output.withColumn("visit_check", visitCheck).select("userid","eventtime", "location_pont", "visit_check")

当我使用 RangeBetween 函数时,这段代码给我一个分析异常:

AnalysisException: u'Window Frame RANGE BETWEEN CURRENT ROW AND 1500 FOLLOWING must match the required frame ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING;

你知道解决这个问题的方法吗?

rangeBetween 对于像 lag 这样的非聚合函数没有意义。 lag 始终采用特定行,由偏移量参数表示,因此指定帧毫无意义。

要获得 window 时间序列,您可以使用 window 标准聚合分组:

from pyspark.sql.functions import window,  countDistinct


(df
    .groupBy("location_point", window("eventtime", "5 minutes"))
    .agg( countDistinct("userid")))

您可以添加更多参数来修改幻灯片持续时间。

如果按 location 进行分区,您可以尝试使用 window 函数进行类似操作:

windowSpec = (W.partitionBy(col("location"))
  .orderBy(col("eventtime").cast("timestamp").cast("long"))
  .rangeBetween(0, days(5)))


df.withColumn("id_count", countDistinct("userid").over(windowSpec))

鉴于您的数据:

让我们添加一个以秒为单位的时间戳列:

df = df.withColumn('timestamp',df_taf.eventtime.astype('Timestamp').cast("long"))
df.show()

+--------+-------------------+--------------+----------+
|  userid|          eventtime|location_point| timestamp|  
+--------+-------------------+--------------+----------+
|4e191908|2017-06-04 03:00:00|      18685891|1496545200|
|4e191908|2017-06-04 03:04:00|      18685891|1496545440|
|3136afcb|2017-06-04 03:03:00|      18382821|1496545380|
|661212dd|2017-06-04 03:06:00|      80831484|1496545560|
|40e8a7c3|2017-06-04 03:12:00|      18825769|1496545920|
|4e191908|2017-06-04 03:11:30|      18685891|1496545890|
+--------+-------------------+--------------+----------+  

现在,让我们定义一个 window 函数,按 location_point 划分,按时间戳排序,范围在 -300 秒和当前时间之间。我们可以计算这个 window 中的元素数量,并将这些数据放在名为 'occurences in_5_min':

的列中
w = Window.partitionBy('location_point').orderBy('timestamp').rangeBetween(-60*5,0)
df = df.withColumn('occurrences_in_5_min',F.count('timestamp').over(w))
df.show()

+--------+-------------------+--------------+----------+--------------------+
|  userid|          eventtime|location_point| timestamp|occurrences_in_5_min|
+--------+-------------------+--------------+----------+--------------------+
|40e8a7c3|2017-06-04 03:12:00|      18825769|1496545920|                   1|
|3136afcb|2017-06-04 03:03:00|      18382821|1496545380|                   1|
|661212dd|2017-06-04 03:06:00|      80831484|1496545560|                   1|
|4e191908|2017-06-04 03:00:00|      18685891|1496545200|                   1|
|4e191908|2017-06-04 03:04:00|      18685891|1496545440|                   2|
|4e191908|2017-06-04 03:11:30|      18685891|1496545890|                   1|
+--------+-------------------+--------------+----------+--------------------+

现在,如果在特定位置的最后 5 分钟内出现的次数严格大于 1 次,您可以使用 True 添加所需的列:

add_bool = udf(lambda col : True if col>1 else False, BooleanType())
df = df.withColumn('already_occured',add_bool('occurrences_in_5_min'))
df.show()

+--------+-------------------+--------------+----------+--------------------+---------------+
|  userid|          eventtime|location_point| timestamp|occurrences_in_5_min|already_occured|
+--------+-------------------+--------------+----------+--------------------+---------------+
|40e8a7c3|2017-06-04 03:12:00|      18825769|1496545920|                   1|          false|
|3136afcb|2017-06-04 03:03:00|      18382821|1496545380|                   1|          false|
|661212dd|2017-06-04 03:06:00|      80831484|1496545560|                   1|          false|
|4e191908|2017-06-04 03:00:00|      18685891|1496545200|                   1|          false|
|4e191908|2017-06-04 03:04:00|      18685891|1496545440|                   2|           true|
|4e191908|2017-06-04 03:11:30|      18685891|1496545890|                   1|          false|
+--------+-------------------+--------------+----------+--------------------+---------------+