时间间隔内的 Pyspark 组数据框
Pyspark group dataframe within time interval
我有一个已排序的 PYSPARK 数据框('timestamp' 和 'ship' 升序):
+----------------------+------+
| timestamp | ship |
+----------------------+------+
| 2018-08-01 06:01:00 | 1 |
| 2018-08-01 06:01:30 | 1 |
| 2018-08-01 09:00:00 | 1 |
| 2018-08-01 09:00:00 | 2 |
| 2018-08-01 10:15:43 | 2 |
| 2018-08-01 11:00:01 | 3 |
| 2018-08-01 06:00:13 | 4 |
| 2018-08-01 13:00:00 | 4 |
| 2018-08-13 14:00:00 | 5 |
| 2018-08-13 14:15:03 | 5 |
| 2018-08-13 14:45:08 | 5 |
| 2018-08-13 14:50:00 | 5 |
+-----------------------------+
我想在名为 'trip' 的数据框中添加一个新列。行程定义为从数据框中的船舶记录开始后 2 小时内航行的船号。如果在两小时内船号发生变化,则应将新的行程号添加到数据框列 'trip'。
所需的输出如下所示:
+----------------------+------+-------+
| timestamp | ship | trip |
+----------------------+------+-------+
| 2018-08-01 06:01:00 | 1 | 1 | # start new ship number
| 2018-08-01 06:01:30 | 1 | 1 | # still within 2 hours of same ship number
| 2018-08-01 09:00:00 | 1 | 2 | # more than 2 hours of same ship number = new trip
| 2018-08-01 09:00:00 | 2 | 3 | # new ship number = new trip
| 2018-08-01 10:15:43 | 2 | 3 | # still within 2 hours of same ship number
| 2018-08-01 11:00:01 | 3 | 4 | # new ship number = new trip
| 2018-08-01 06:00:13 | 4 | 5 | # new ship number = new trip
| 2018-08-01 13:00:00 | 4 | 6 | # more than 2 hours of same ship number = new trip
| 2018-08-13 14:00:00 | 5 | 7 | # new ship number = new trip
| 2018-08-13 14:15:03 | 5 | 7 | # still within 2 hours of same ship number
| 2018-08-13 14:45:08 | 5 | 7 | # still within 2 hours of same ship number
| 2018-08-13 14:50:00 | 5 | 7 | # still within 2 hours of same ship number
+-----------------------------+-------+
在 Pandas 中会这样进行:
dt_trip = 2 # time duration trip per ship (in hours)
total_time = df['timestamp'] - df.groupby('name')['timestamp'].transform('min')
trips = total_time.dt.total_seconds().fillna(0)//(dt_trip*3600)
df['trip'] = df.groupby(['name', trips]).ngroup()+1
这在 PYSPARK 中如何完成?
尝试使用 window functions
、row_number()
、collect_list()
,以及 incremental sum
条件创建。
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w1=Window().partitionBy("ship").orderBy(F.unix_timestamp("timestamp")).rangeBetween(-7199, Window.currentRow)
w2=Window().partitionBy("ship").orderBy("timestamp")
w3=Window().orderBy("ship","timestamp")
df.withColumn("trip", F.sum(F.when(F.row_number().over(w2)==1, F.lit(1))\
.when(F.size(F.collect_list("ship").over(w1))==1, F.lit(1))\
.otherwise(F.lit(0))).over(w3)).orderBy("ship","timestamp").show()
#+-------------------+----+----+
#| timestamp|ship|trip|
#+-------------------+----+----+
#|2018-08-01 06:01:00| 1| 1|
#|2018-08-01 06:01:30| 1| 1|
#|2018-08-01 09:00:00| 1| 2|
#|2018-08-01 09:00:00| 2| 3|
#|2018-08-01 10:15:43| 2| 3|
#|2018-08-01 11:00:01| 3| 4|
#|2018-08-01 06:00:13| 4| 5|
#|2018-08-01 13:00:00| 4| 6|
#|2018-08-13 14:00:00| 5| 7|
#|2018-08-13 14:15:03| 5| 7|
#|2018-08-13 14:45:08| 5| 7|
#|2018-08-13 14:50:00| 5| 7|
#+-------------------+----+----+
我有一个已排序的 PYSPARK 数据框('timestamp' 和 'ship' 升序):
+----------------------+------+
| timestamp | ship |
+----------------------+------+
| 2018-08-01 06:01:00 | 1 |
| 2018-08-01 06:01:30 | 1 |
| 2018-08-01 09:00:00 | 1 |
| 2018-08-01 09:00:00 | 2 |
| 2018-08-01 10:15:43 | 2 |
| 2018-08-01 11:00:01 | 3 |
| 2018-08-01 06:00:13 | 4 |
| 2018-08-01 13:00:00 | 4 |
| 2018-08-13 14:00:00 | 5 |
| 2018-08-13 14:15:03 | 5 |
| 2018-08-13 14:45:08 | 5 |
| 2018-08-13 14:50:00 | 5 |
+-----------------------------+
我想在名为 'trip' 的数据框中添加一个新列。行程定义为从数据框中的船舶记录开始后 2 小时内航行的船号。如果在两小时内船号发生变化,则应将新的行程号添加到数据框列 'trip'。
所需的输出如下所示:
+----------------------+------+-------+
| timestamp | ship | trip |
+----------------------+------+-------+
| 2018-08-01 06:01:00 | 1 | 1 | # start new ship number
| 2018-08-01 06:01:30 | 1 | 1 | # still within 2 hours of same ship number
| 2018-08-01 09:00:00 | 1 | 2 | # more than 2 hours of same ship number = new trip
| 2018-08-01 09:00:00 | 2 | 3 | # new ship number = new trip
| 2018-08-01 10:15:43 | 2 | 3 | # still within 2 hours of same ship number
| 2018-08-01 11:00:01 | 3 | 4 | # new ship number = new trip
| 2018-08-01 06:00:13 | 4 | 5 | # new ship number = new trip
| 2018-08-01 13:00:00 | 4 | 6 | # more than 2 hours of same ship number = new trip
| 2018-08-13 14:00:00 | 5 | 7 | # new ship number = new trip
| 2018-08-13 14:15:03 | 5 | 7 | # still within 2 hours of same ship number
| 2018-08-13 14:45:08 | 5 | 7 | # still within 2 hours of same ship number
| 2018-08-13 14:50:00 | 5 | 7 | # still within 2 hours of same ship number
+-----------------------------+-------+
在 Pandas 中会这样进行:
dt_trip = 2 # time duration trip per ship (in hours)
total_time = df['timestamp'] - df.groupby('name')['timestamp'].transform('min')
trips = total_time.dt.total_seconds().fillna(0)//(dt_trip*3600)
df['trip'] = df.groupby(['name', trips]).ngroup()+1
这在 PYSPARK 中如何完成?
尝试使用 window functions
、row_number()
、collect_list()
,以及 incremental sum
条件创建。
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w1=Window().partitionBy("ship").orderBy(F.unix_timestamp("timestamp")).rangeBetween(-7199, Window.currentRow)
w2=Window().partitionBy("ship").orderBy("timestamp")
w3=Window().orderBy("ship","timestamp")
df.withColumn("trip", F.sum(F.when(F.row_number().over(w2)==1, F.lit(1))\
.when(F.size(F.collect_list("ship").over(w1))==1, F.lit(1))\
.otherwise(F.lit(0))).over(w3)).orderBy("ship","timestamp").show()
#+-------------------+----+----+
#| timestamp|ship|trip|
#+-------------------+----+----+
#|2018-08-01 06:01:00| 1| 1|
#|2018-08-01 06:01:30| 1| 1|
#|2018-08-01 09:00:00| 1| 2|
#|2018-08-01 09:00:00| 2| 3|
#|2018-08-01 10:15:43| 2| 3|
#|2018-08-01 11:00:01| 3| 4|
#|2018-08-01 06:00:13| 4| 5|
#|2018-08-01 13:00:00| 4| 6|
#|2018-08-13 14:00:00| 5| 7|
#|2018-08-13 14:15:03| 5| 7|
#|2018-08-13 14:45:08| 5| 7|
#|2018-08-13 14:50:00| 5| 7|
#+-------------------+----+----+