时间段在具有特定状态(列值)的行上的分布
Distribution of time periods over rows with certain status (column value)
我有一个包含日志的 Pyspark 数据框,每行对应于系统在记录时的状态,以及一个组号。我想找出每个组处于不健康状态的时间段的长度。
例如,如果这是我的 table:
TIMESTAMP | STATUS_CODE | GROUP_NUMBER
--------------------------------------
02:03:11 | healthy | 000001
02:03:04 | healthy | 000001
02:03:03 | unhealthy | 000001
02:03:00 | unhealthy | 000001
02:02:58 | healthy | 000008
02:02:57 | healthy | 000008
02:02:55 | unhealthy | 000001
02:02:54 | healthy | 000001
02:02:50 | healthy | 000007
02:02:48 | healthy | 000004
我想要 return 组 000001 有 9 秒的不健康时间段(从 02:02:55 到 02:03:04)。
其他群体也可能有不健康的时间段,我也想 return 那些。
由于连续行可能具有相同的状态,并且由于不同组的行散布,我正在努力寻找一种有效地执行此操作的方法。
我无法将 Pyspark 数据帧转换为 Pandas 数据帧,因为它太大了。
如何有效地确定这些时间段的长度?
非常感谢!
一个简单的方法(可能不是最优的)是:
- 映射到
[K,V]
,使用 GROUP_NUMBER 作为键 K
- 使用
repartitionAndSortWithinPartitions
,这样您将在同一分区中获得每个组的所有数据,并按 TIMESTAMP
对它们进行排序。这个答案中详细解释了它是如何工作的:
- 最后使用
mapPartitions
获取单个分区中已排序数据的迭代器,因此您可以轻松找到所需的答案。 (mapPartitions
的解释:How does the pyspark mapPartitions function work?)
带有 spark-sql 解决方案的 pyspark 看起来像这样。
首先我们创建示例数据集。除了数据集,我们在组上生成 row_number 字段分区并按时间戳排序。然后我们将生成的数据框注册为 table 说 table1
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import unix_timestamp
df = spark.createDataFrame([
('2017-01-01 02:03:11','healthy','000001'),
('2017-01-01 02:03:04','healthy','000001'),
('2017-01-01 02:03:03','unhealthy','000001'),
('2017-01-01 02:03:00','unhealthy','000001'),
('2017-01-01 02:02:58','healthy','000008'),
('2017-01-01 02:02:57','healthy','000008'),
('2017-01-01 02:02:55','unhealthy','000001'),
('2017-01-01 02:02:54','healthy','000001'),
('2017-01-01 02:02:50','healthy','000007'),
('2017-01-01 02:02:48','healthy','000004')
],['timestamp','state','group_id'])
df = df.withColumn('rownum', row_number().over(Window.partitionBy(df.group_id).orderBy(unix_timestamp(df.timestamp))))
df.registerTempTable("table1")
一旦数据框被注册为 table (table1
)。所需的数据可以使用 spark-sql
计算如下
>>> spark.sql("""
... SELECT t1.group_id,sum((t2.timestamp_value - t1.timestamp_value)) as duration
... FROM
... (SELECT unix_timestamp(timestamp) as timestamp_value,group_id,rownum FROM table1 WHERE state = 'unhealthy') t1
... LEFT JOIN
... (SELECT unix_timestamp(timestamp) as timestamp_value,group_id,rownum FROM table1) t2
... ON t1.group_id = t2.group_id
... AND t1.rownum = t2.rownum - 1
... group by t1.group_id
... """).show()
+--------+--------+
|group_id|duration|
+--------+--------+
| 000001| 9|
+--------+--------+
样本数据集只有 group_id 00001
的数据不健康。但此解决方案适用于其他 group_id 处于不健康状态的情况。
我有一个包含日志的 Pyspark 数据框,每行对应于系统在记录时的状态,以及一个组号。我想找出每个组处于不健康状态的时间段的长度。
例如,如果这是我的 table:
TIMESTAMP | STATUS_CODE | GROUP_NUMBER
--------------------------------------
02:03:11 | healthy | 000001
02:03:04 | healthy | 000001
02:03:03 | unhealthy | 000001
02:03:00 | unhealthy | 000001
02:02:58 | healthy | 000008
02:02:57 | healthy | 000008
02:02:55 | unhealthy | 000001
02:02:54 | healthy | 000001
02:02:50 | healthy | 000007
02:02:48 | healthy | 000004
我想要 return 组 000001 有 9 秒的不健康时间段(从 02:02:55 到 02:03:04)。
其他群体也可能有不健康的时间段,我也想 return 那些。
由于连续行可能具有相同的状态,并且由于不同组的行散布,我正在努力寻找一种有效地执行此操作的方法。
我无法将 Pyspark 数据帧转换为 Pandas 数据帧,因为它太大了。
如何有效地确定这些时间段的长度?
非常感谢!
一个简单的方法(可能不是最优的)是:
- 映射到
[K,V]
,使用 GROUP_NUMBER 作为键 K - 使用
repartitionAndSortWithinPartitions
,这样您将在同一分区中获得每个组的所有数据,并按TIMESTAMP
对它们进行排序。这个答案中详细解释了它是如何工作的: - 最后使用
mapPartitions
获取单个分区中已排序数据的迭代器,因此您可以轻松找到所需的答案。 (mapPartitions
的解释:How does the pyspark mapPartitions function work?)
带有 spark-sql 解决方案的 pyspark 看起来像这样。
首先我们创建示例数据集。除了数据集,我们在组上生成 row_number 字段分区并按时间戳排序。然后我们将生成的数据框注册为 table 说 table1
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import unix_timestamp
df = spark.createDataFrame([
('2017-01-01 02:03:11','healthy','000001'),
('2017-01-01 02:03:04','healthy','000001'),
('2017-01-01 02:03:03','unhealthy','000001'),
('2017-01-01 02:03:00','unhealthy','000001'),
('2017-01-01 02:02:58','healthy','000008'),
('2017-01-01 02:02:57','healthy','000008'),
('2017-01-01 02:02:55','unhealthy','000001'),
('2017-01-01 02:02:54','healthy','000001'),
('2017-01-01 02:02:50','healthy','000007'),
('2017-01-01 02:02:48','healthy','000004')
],['timestamp','state','group_id'])
df = df.withColumn('rownum', row_number().over(Window.partitionBy(df.group_id).orderBy(unix_timestamp(df.timestamp))))
df.registerTempTable("table1")
一旦数据框被注册为 table (table1
)。所需的数据可以使用 spark-sql
>>> spark.sql("""
... SELECT t1.group_id,sum((t2.timestamp_value - t1.timestamp_value)) as duration
... FROM
... (SELECT unix_timestamp(timestamp) as timestamp_value,group_id,rownum FROM table1 WHERE state = 'unhealthy') t1
... LEFT JOIN
... (SELECT unix_timestamp(timestamp) as timestamp_value,group_id,rownum FROM table1) t2
... ON t1.group_id = t2.group_id
... AND t1.rownum = t2.rownum - 1
... group by t1.group_id
... """).show()
+--------+--------+
|group_id|duration|
+--------+--------+
| 000001| 9|
+--------+--------+
样本数据集只有 group_id 00001
的数据不健康。但此解决方案适用于其他 group_id 处于不健康状态的情况。