如何根据我在 pyspark 中的前一行值向给定时间戳添加分钟数

How can I add minutes to given timestamp based on my previous row value in pyspark

我有一个 pyspark 数据框

   +----------+----------+---------------------+
   | Activity | Interval |    ReadDateTime     |
   +----------+----------+---------------------+
   |    A     |    1     | 2019-12-13 10:00:00 | 
   |    A     |    2     | 2019-12-13 10:00:00 |
   |    A     |    3     | 2019-12-13 10:00:00 |
   |    B     |    1     | 2019-12-13 11:00:00 | 
   |    B     |    2     | 2019-12-13 11:00:00 |
   |    B     |    3     | 2019-12-13 11:00:00 |
   +--------- +----------+---------------------+

现在我必须根据前一行中的值向 ReadDateTime 列添加 5 分钟。我预期的数据框如下所示

   +----------+----------+---------------------+
   | Activity | Interval |    ReadDateTime     |
   +----------+----------+---------------------+
   |    A     |    1     | 2019-12-13 10:00:00 | 
   |    A     |    2     | 2019-12-13 10:05:00 |
   |    A     |    3     | 2019-12-13 10:10:00 |
   |    B     |    1     | 2019-12-13 11:00:00 | 
   |    B     |    2     | 2019-12-13 11:05:00 |
   |    B     |    3     | 2019-12-13 11:10:00 |
   +--------- +----------+---------------------+

我不会向对应于间隔 1 的 ReadDateTime 列添加 5 分钟,而我会继续向其他行添加 5 分钟,直到我的 activity 更改

有一个丑陋的方法

from pyspark.sql.functions import *
from pyspark.sql.types import StringType

def update(interval,date):
  if (interval == 1):
    return date
  elif (interval == 2):
    return date + 'add 5 min'
  elif (interval == 3):
    return date + 'add 10 min'

#df.dtypes

my_udf = udf(lambda x,y: update(x,y), StringType())

df.withColumn('updated_realDateTime', my_udf(df.interval, df.realDateTime) ).show(truncate=False)

当然我的更新函数不是你想要的,所以你必须改变它,但它会完成工作(如果所有间隔的模式都相同,你不需要 elifs,你可以做到动态)

这是为任何有更好答案的人创建数据框的代码

data = [ (1,'2019-12-13 10:00:00'), 
   (2, '2019-12-13 10:00:00'),
   (3, '2019-12-13 10:00:00'),
   (1, '2019-12-13 11:00:00'), 
   (2, '2019-12-13 11:00:00'),
   (3, '2019-12-13 11:00:00')]
df = sqlContext.createDataFrame(data, ['interval','realDateTime']).cache()

感谢 Ali Yesilli 的 post,我找到了解决方案 .

我首先将我的 ReadDateTime 转换为 unix 时间戳,并且仅当我的间隔不等于 1 时才向其添加 5 分钟。所以我的代码如下所示。

   from pyspark.sql.functions import *

   df = df.withColumn("ReadDateTime1", when(col("Interval") != lit(1),
   col("ReadDateTime") + 
   (col("Interval")*expr("Interval 5 minutes"))).otherwise(col('ReadDateTime')))