如何计算组中事件之间的时间

How to calculate time between events in a group

如何找到组中事件之间的时间?

例如,我有 Streaming Source (Kafka),我从中获得了很多列。此流被读入 spark、预处理、清理,仅保留以下四列:“ClientTimestamp”、“sensor_type”、“activity”、“User_detail”、

现在,我想计算每个用户的关键 activity 存在的总时间。

 Clientimestamp         Sensor_type     activity         User_detail
4/11/2021 10:00:00      ultrasonic       critical          user_A
4/11/2021 10:00:00      ultrasonic       normal            user_B            
4/11/2021 10:03:00      ultrasonic       normal            user_A
4/11/2021 10:05:00      ultrasonic       critical          user_B
4/11/2021 10:06:00      ultrasonic       critical          user_A
4/11/2021 10:07:00      ultrasonic       critical          user_A
4/11/2021 10:08:00      ultrasonic       critical          user_B
4/11/2021 10:09:00      ultrasonic       critical          user_B

所以对于 user_A,所有关键事件之间的总时间 activity 是通过找出两个关键事件之间的差异并将这些差异相加来计算的。

(10:00:00 - 10:06:00)+(10:06:00 - 10:07:00)
 therefore for userA critical activity lasted for total minute of (5+1)= 6 minutes.

与 user_B、

类似
(10:05:00 - 10:08:00)+ (10:08:00-10:09:00)
 userB critical activity lasted for total minute of (3+1) = 4 minute

对于每个 window,我想调用一个自定义函数来计算总时间。如何在按 window 分组的组上应用函数?

df = df.withWatermark("clientTimestamp", "10 minutes")\
       .groupby(window(df.clientTimestamp, "10 minutes", "10 minutes"), col('User_detail'), col('activity')) 
       .apply(calculate_time)

看起来这可以通过计算 Window 中每个 User_detail 的最大和最小时间之间的差异来解决。此外,可以应用 activity 上的过滤器来忽略“正常”行。

我看不出为什么需要在此处应用自定义函数(例如“calculate_time”)。请注意,我并不完全熟悉 Python 语法,但您的代码可能如下所示:

df = df \
  .filter(df.activity == "critical") \
  .withWatermark("clientTimestamp", "10 minutes") \
  .groupby(window(df.clientTimestamp, "10 minutes", "10 minutes"), col('User_detail')) \
  .agg((max("clientTimestamp") - min("clientTimestamp")).alias("time_difference"))