如何计算组中事件之间的时间
How to calculate time between events in a group
如何找到组中事件之间的时间?
例如,我有 Streaming Source (Kafka),我从中获得了很多列。此流被读入 spark、预处理、清理,仅保留以下四列:“ClientTimestamp”、“sensor_type”、“activity”、“User_detail”、
现在,我想计算每个用户的关键 activity 存在的总时间。
Clientimestamp Sensor_type activity User_detail
4/11/2021 10:00:00 ultrasonic critical user_A
4/11/2021 10:00:00 ultrasonic normal user_B
4/11/2021 10:03:00 ultrasonic normal user_A
4/11/2021 10:05:00 ultrasonic critical user_B
4/11/2021 10:06:00 ultrasonic critical user_A
4/11/2021 10:07:00 ultrasonic critical user_A
4/11/2021 10:08:00 ultrasonic critical user_B
4/11/2021 10:09:00 ultrasonic critical user_B
所以对于 user_A,所有关键事件之间的总时间 activity 是通过找出两个关键事件之间的差异并将这些差异相加来计算的。
(10:00:00 - 10:06:00)+(10:06:00 - 10:07:00)
therefore for userA critical activity lasted for total minute of (5+1)= 6 minutes.
与 user_B、
类似
(10:05:00 - 10:08:00)+ (10:08:00-10:09:00)
userB critical activity lasted for total minute of (3+1) = 4 minute
对于每个 window,我想调用一个自定义函数来计算总时间。如何在按 window 分组的组上应用函数?
df = df.withWatermark("clientTimestamp", "10 minutes")\
.groupby(window(df.clientTimestamp, "10 minutes", "10 minutes"), col('User_detail'), col('activity'))
.apply(calculate_time)
看起来这可以通过计算 Window 中每个 User_detail 的最大和最小时间之间的差异来解决。此外,可以应用 activity 上的过滤器来忽略“正常”行。
我看不出为什么需要在此处应用自定义函数(例如“calculate_time”)。请注意,我并不完全熟悉 Python 语法,但您的代码可能如下所示:
df = df \
.filter(df.activity == "critical") \
.withWatermark("clientTimestamp", "10 minutes") \
.groupby(window(df.clientTimestamp, "10 minutes", "10 minutes"), col('User_detail')) \
.agg((max("clientTimestamp") - min("clientTimestamp")).alias("time_difference"))
如何找到组中事件之间的时间?
例如,我有 Streaming Source (Kafka),我从中获得了很多列。此流被读入 spark、预处理、清理,仅保留以下四列:“ClientTimestamp”、“sensor_type”、“activity”、“User_detail”、
现在,我想计算每个用户的关键 activity 存在的总时间。
Clientimestamp Sensor_type activity User_detail
4/11/2021 10:00:00 ultrasonic critical user_A
4/11/2021 10:00:00 ultrasonic normal user_B
4/11/2021 10:03:00 ultrasonic normal user_A
4/11/2021 10:05:00 ultrasonic critical user_B
4/11/2021 10:06:00 ultrasonic critical user_A
4/11/2021 10:07:00 ultrasonic critical user_A
4/11/2021 10:08:00 ultrasonic critical user_B
4/11/2021 10:09:00 ultrasonic critical user_B
所以对于 user_A,所有关键事件之间的总时间 activity 是通过找出两个关键事件之间的差异并将这些差异相加来计算的。
(10:00:00 - 10:06:00)+(10:06:00 - 10:07:00)
therefore for userA critical activity lasted for total minute of (5+1)= 6 minutes.
与 user_B、
类似(10:05:00 - 10:08:00)+ (10:08:00-10:09:00)
userB critical activity lasted for total minute of (3+1) = 4 minute
对于每个 window,我想调用一个自定义函数来计算总时间。如何在按 window 分组的组上应用函数?
df = df.withWatermark("clientTimestamp", "10 minutes")\
.groupby(window(df.clientTimestamp, "10 minutes", "10 minutes"), col('User_detail'), col('activity'))
.apply(calculate_time)
看起来这可以通过计算 Window 中每个 User_detail 的最大和最小时间之间的差异来解决。此外,可以应用 activity 上的过滤器来忽略“正常”行。
我看不出为什么需要在此处应用自定义函数(例如“calculate_time”)。请注意,我并不完全熟悉 Python 语法,但您的代码可能如下所示:
df = df \
.filter(df.activity == "critical") \
.withWatermark("clientTimestamp", "10 minutes") \
.groupby(window(df.clientTimestamp, "10 minutes", "10 minutes"), col('User_detail')) \
.agg((max("clientTimestamp") - min("clientTimestamp")).alias("time_difference"))