如何使用 PySpark 分析以下事件?

How to use PySpark to analysis below events?

假设我们在 PySpark 中有一个 DataFrame(df)。以及,如何使用 PySpark 获取每次骑车事件和驾驶事件之间的持续时间(以分钟为单位)?并且,假设每辆自行车只有一个对应的驾驶事件;然而,一天之内可能会有多个“骑车-驾驶”对。最终将最终结果存入数据框,包括biking__time、biking_event、driving_time、driving_event、each_durations等)

注意:骑车和开车之间还可以有其他事件,比如一个人可以从骑车开始,运行,游泳,然后开车。

一个例子参考下面table:

日期 03/01/2018 骑自行车和开车之间的持续时间是:8:12 - 5:12 = 3 小时 = 180 分钟

TimeDetails Event
1 3/1/2018 5:12 Biking
2 3/1/2018 6:12 Swimming
3 3/1/2018 7:12 Hiking
4 3/1/2018 8:12 Driving
5 3/2/2018 9:12 Biking
6 3/2/2018 10:12 Swimming
7 3/2/2018 11:12 Swimming
8 3/2/2018 12:12 Driving
9 3/2/2018 13:12 Swimming

下面是示例输出:

biking_time event_name1 driving_time event_name2 durations_inMins
1 3/1/2018 5:12 biking 3/1/2018 8:12 driving 180
2 3/2/2018 9:12 biking 3/2/2018 12:12 driving 180

下面是我的一些代码:

biking_df = df.filter(df.Event == 'Biking)
driving_df = df.filter(df.Event == 'Driving')

有人可以向我提供 PySpark 中的一些代码吗? 非常感谢

您的示例(我添加了另一天缺少驾驶记录 - 该解决方案现在也可以处理):

df  = spark.createDataFrame(
  [
('1','3/1/2018 5:12','Biking')
,('2','3/1/2018 6:12','Swimming')
,('3','3/1/2018 7:12','Hiking')
,('4','3/1/2018 8:12','Driving')
,('5','3/2/2018 9:12','Biking')
,('6','3/2/2018 10:12','Swimming')
,('7','3/2/2018 11:12','Swimming')
,('8','3/2/2018 12:12','Driving')
,('9','3/2/2018 13:12','Swimming')
,('10','3/3/2018 9:10','Biking')
,('11','3/3/2018 9:50','Swimming')
,('12','3/3/2018 10:30','Swimming')
,('13','3/3/2018 11:12','Hiking')
  ], ['index','TimeDetails','Event']
)

解决方案

from pyspark.sql import functions as F

df = df\
.withColumn('TimeDetails', F.to_timestamp('TimeDetails', 'M/d/y H:m'))\
.withColumn('date', F.to_date('TimeDetails'))

#Finding all possible dates in the original dataset:
date_interval = df\
                .agg(
                    F.date_trunc("dd", F.max(F.col("date"))).alias("max_date"),
                    F.date_trunc("dd", F.min(F.col("date"))).alias("min_date"))\
                .withColumn('date_interval', F.explode(F.expr('sequence(min_date, max_date, interval 1 day)')))\
                .select('date_interval')\
                .withColumn('date_interval', F.to_date('date_interval'))

#Imputing those dates on biking and drivig subsets

biking_df = date_interval\
                .join(df.filter(df.Event == 'Biking'),date_interval.date_interval == df.date,'left')\
                .withColumn('Event', F.coalesce(F.col('Event'), F.lit('Biking')))\
                .select('date_interval',F.col('TimeDetails').alias('biking_time'),F.col('Event').alias('event_name1'))

driving_df = date_interval\
                .join(df.filter(df.Event == 'Driving'),date_interval.date_interval == df.date,'left')\
                .withColumn('Event', F.coalesce(F.col('Event'), F.lit('Driving')))\
                .select('date_interval',F.col('TimeDetails').alias('driving_time'),F.col('Event').alias('event_name2'))

result = biking_df\
    .join(driving_df, 'date_interval')\
    .withColumn('durations_inMins',(F.unix_timestamp("driving_time") - F.unix_timestamp('biking_time'))/60)\
    .select('biking_time','event_name1','driving_time','event_name2','durations_inMins')


result.show()

输出:

+-------------------+-----------+-------------------+-----------+----------------+
|        biking_time|event_name1|       driving_time|event_name2|durations_inMins|
+-------------------+-----------+-------------------+-----------+----------------+
|2018-03-01 05:12:00|     Biking|2018-03-01 08:12:00|    Driving|           180.0|
|2018-03-02 09:12:00|     Biking|2018-03-02 12:12:00|    Driving|           180.0|
|2018-03-03 09:10:00|     Biking|               null|    Driving|            null|
+-------------------+-----------+-------------------+-----------+----------------+