如何使用 PySpark 分析以下事件?
How to use PySpark to analysis below events?
假设我们在 PySpark 中有一个 DataFrame(df)。以及,如何使用 PySpark 获取每次骑车事件和驾驶事件之间的持续时间(以分钟为单位)?并且,假设每辆自行车只有一个对应的驾驶事件;然而,一天之内可能会有多个“骑车-驾驶”对。最终将最终结果存入数据框,包括biking__time、biking_event、driving_time、driving_event、each_durations等)
注意:骑车和开车之间还可以有其他事件,比如一个人可以从骑车开始,运行,游泳,然后开车。
一个例子参考下面table:
日期 03/01/2018 骑自行车和开车之间的持续时间是:8:12 - 5:12 = 3 小时 = 180 分钟
TimeDetails
Event
1
3/1/2018 5:12
Biking
2
3/1/2018 6:12
Swimming
3
3/1/2018 7:12
Hiking
4
3/1/2018 8:12
Driving
5
3/2/2018 9:12
Biking
6
3/2/2018 10:12
Swimming
7
3/2/2018 11:12
Swimming
8
3/2/2018 12:12
Driving
9
3/2/2018 13:12
Swimming
下面是示例输出:
biking_time
event_name1
driving_time
event_name2
durations_inMins
1
3/1/2018 5:12
biking
3/1/2018 8:12
driving
180
2
3/2/2018 9:12
biking
3/2/2018 12:12
driving
180
下面是我的一些代码:
biking_df = df.filter(df.Event == 'Biking)
driving_df = df.filter(df.Event == 'Driving')
有人可以向我提供 PySpark 中的一些代码吗?
非常感谢
您的示例(我添加了另一天缺少驾驶记录 - 该解决方案现在也可以处理):
df = spark.createDataFrame(
[
('1','3/1/2018 5:12','Biking')
,('2','3/1/2018 6:12','Swimming')
,('3','3/1/2018 7:12','Hiking')
,('4','3/1/2018 8:12','Driving')
,('5','3/2/2018 9:12','Biking')
,('6','3/2/2018 10:12','Swimming')
,('7','3/2/2018 11:12','Swimming')
,('8','3/2/2018 12:12','Driving')
,('9','3/2/2018 13:12','Swimming')
,('10','3/3/2018 9:10','Biking')
,('11','3/3/2018 9:50','Swimming')
,('12','3/3/2018 10:30','Swimming')
,('13','3/3/2018 11:12','Hiking')
], ['index','TimeDetails','Event']
)
解决方案
from pyspark.sql import functions as F
df = df\
.withColumn('TimeDetails', F.to_timestamp('TimeDetails', 'M/d/y H:m'))\
.withColumn('date', F.to_date('TimeDetails'))
#Finding all possible dates in the original dataset:
date_interval = df\
.agg(
F.date_trunc("dd", F.max(F.col("date"))).alias("max_date"),
F.date_trunc("dd", F.min(F.col("date"))).alias("min_date"))\
.withColumn('date_interval', F.explode(F.expr('sequence(min_date, max_date, interval 1 day)')))\
.select('date_interval')\
.withColumn('date_interval', F.to_date('date_interval'))
#Imputing those dates on biking and drivig subsets
biking_df = date_interval\
.join(df.filter(df.Event == 'Biking'),date_interval.date_interval == df.date,'left')\
.withColumn('Event', F.coalesce(F.col('Event'), F.lit('Biking')))\
.select('date_interval',F.col('TimeDetails').alias('biking_time'),F.col('Event').alias('event_name1'))
driving_df = date_interval\
.join(df.filter(df.Event == 'Driving'),date_interval.date_interval == df.date,'left')\
.withColumn('Event', F.coalesce(F.col('Event'), F.lit('Driving')))\
.select('date_interval',F.col('TimeDetails').alias('driving_time'),F.col('Event').alias('event_name2'))
result = biking_df\
.join(driving_df, 'date_interval')\
.withColumn('durations_inMins',(F.unix_timestamp("driving_time") - F.unix_timestamp('biking_time'))/60)\
.select('biking_time','event_name1','driving_time','event_name2','durations_inMins')
result.show()
输出:
+-------------------+-----------+-------------------+-----------+----------------+
| biking_time|event_name1| driving_time|event_name2|durations_inMins|
+-------------------+-----------+-------------------+-----------+----------------+
|2018-03-01 05:12:00| Biking|2018-03-01 08:12:00| Driving| 180.0|
|2018-03-02 09:12:00| Biking|2018-03-02 12:12:00| Driving| 180.0|
|2018-03-03 09:10:00| Biking| null| Driving| null|
+-------------------+-----------+-------------------+-----------+----------------+
假设我们在 PySpark 中有一个 DataFrame(df)。以及,如何使用 PySpark 获取每次骑车事件和驾驶事件之间的持续时间(以分钟为单位)?并且,假设每辆自行车只有一个对应的驾驶事件;然而,一天之内可能会有多个“骑车-驾驶”对。最终将最终结果存入数据框,包括biking__time、biking_event、driving_time、driving_event、each_durations等)
注意:骑车和开车之间还可以有其他事件,比如一个人可以从骑车开始,运行,游泳,然后开车。
一个例子参考下面table:
日期 03/01/2018 骑自行车和开车之间的持续时间是:8:12 - 5:12 = 3 小时 = 180 分钟
TimeDetails | Event | |
---|---|---|
1 | 3/1/2018 5:12 | Biking |
2 | 3/1/2018 6:12 | Swimming |
3 | 3/1/2018 7:12 | Hiking |
4 | 3/1/2018 8:12 | Driving |
5 | 3/2/2018 9:12 | Biking |
6 | 3/2/2018 10:12 | Swimming |
7 | 3/2/2018 11:12 | Swimming |
8 | 3/2/2018 12:12 | Driving |
9 | 3/2/2018 13:12 | Swimming |
下面是示例输出:
biking_time | event_name1 | driving_time | event_name2 | durations_inMins | |
---|---|---|---|---|---|
1 | 3/1/2018 5:12 | biking | 3/1/2018 8:12 | driving | 180 |
2 | 3/2/2018 9:12 | biking | 3/2/2018 12:12 | driving | 180 |
下面是我的一些代码:
biking_df = df.filter(df.Event == 'Biking)
driving_df = df.filter(df.Event == 'Driving')
有人可以向我提供 PySpark 中的一些代码吗? 非常感谢
您的示例(我添加了另一天缺少驾驶记录 - 该解决方案现在也可以处理):
df = spark.createDataFrame(
[
('1','3/1/2018 5:12','Biking')
,('2','3/1/2018 6:12','Swimming')
,('3','3/1/2018 7:12','Hiking')
,('4','3/1/2018 8:12','Driving')
,('5','3/2/2018 9:12','Biking')
,('6','3/2/2018 10:12','Swimming')
,('7','3/2/2018 11:12','Swimming')
,('8','3/2/2018 12:12','Driving')
,('9','3/2/2018 13:12','Swimming')
,('10','3/3/2018 9:10','Biking')
,('11','3/3/2018 9:50','Swimming')
,('12','3/3/2018 10:30','Swimming')
,('13','3/3/2018 11:12','Hiking')
], ['index','TimeDetails','Event']
)
解决方案
from pyspark.sql import functions as F
df = df\
.withColumn('TimeDetails', F.to_timestamp('TimeDetails', 'M/d/y H:m'))\
.withColumn('date', F.to_date('TimeDetails'))
#Finding all possible dates in the original dataset:
date_interval = df\
.agg(
F.date_trunc("dd", F.max(F.col("date"))).alias("max_date"),
F.date_trunc("dd", F.min(F.col("date"))).alias("min_date"))\
.withColumn('date_interval', F.explode(F.expr('sequence(min_date, max_date, interval 1 day)')))\
.select('date_interval')\
.withColumn('date_interval', F.to_date('date_interval'))
#Imputing those dates on biking and drivig subsets
biking_df = date_interval\
.join(df.filter(df.Event == 'Biking'),date_interval.date_interval == df.date,'left')\
.withColumn('Event', F.coalesce(F.col('Event'), F.lit('Biking')))\
.select('date_interval',F.col('TimeDetails').alias('biking_time'),F.col('Event').alias('event_name1'))
driving_df = date_interval\
.join(df.filter(df.Event == 'Driving'),date_interval.date_interval == df.date,'left')\
.withColumn('Event', F.coalesce(F.col('Event'), F.lit('Driving')))\
.select('date_interval',F.col('TimeDetails').alias('driving_time'),F.col('Event').alias('event_name2'))
result = biking_df\
.join(driving_df, 'date_interval')\
.withColumn('durations_inMins',(F.unix_timestamp("driving_time") - F.unix_timestamp('biking_time'))/60)\
.select('biking_time','event_name1','driving_time','event_name2','durations_inMins')
result.show()
输出:
+-------------------+-----------+-------------------+-----------+----------------+
| biking_time|event_name1| driving_time|event_name2|durations_inMins|
+-------------------+-----------+-------------------+-----------+----------------+
|2018-03-01 05:12:00| Biking|2018-03-01 08:12:00| Driving| 180.0|
|2018-03-02 09:12:00| Biking|2018-03-02 12:12:00| Driving| 180.0|
|2018-03-03 09:10:00| Biking| null| Driving| null|
+-------------------+-----------+-------------------+-----------+----------------+