如何根据我在 pyspark 中的前一行值向给定时间戳添加分钟数
How can I add minutes to given timestamp based on my previous row value in pyspark
我有一个 pyspark 数据框
+----------+----------+---------------------+
| Activity | Interval | ReadDateTime |
+----------+----------+---------------------+
| A | 1 | 2019-12-13 10:00:00 |
| A | 2 | 2019-12-13 10:00:00 |
| A | 3 | 2019-12-13 10:00:00 |
| B | 1 | 2019-12-13 11:00:00 |
| B | 2 | 2019-12-13 11:00:00 |
| B | 3 | 2019-12-13 11:00:00 |
+--------- +----------+---------------------+
现在我必须根据前一行中的值向 ReadDateTime 列添加 5 分钟。我预期的数据框如下所示
+----------+----------+---------------------+
| Activity | Interval | ReadDateTime |
+----------+----------+---------------------+
| A | 1 | 2019-12-13 10:00:00 |
| A | 2 | 2019-12-13 10:05:00 |
| A | 3 | 2019-12-13 10:10:00 |
| B | 1 | 2019-12-13 11:00:00 |
| B | 2 | 2019-12-13 11:05:00 |
| B | 3 | 2019-12-13 11:10:00 |
+--------- +----------+---------------------+
我不会向对应于间隔 1 的 ReadDateTime 列添加 5 分钟,而我会继续向其他行添加 5 分钟,直到我的 activity 更改
有一个丑陋的方法
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
def update(interval,date):
if (interval == 1):
return date
elif (interval == 2):
return date + 'add 5 min'
elif (interval == 3):
return date + 'add 10 min'
#df.dtypes
my_udf = udf(lambda x,y: update(x,y), StringType())
df.withColumn('updated_realDateTime', my_udf(df.interval, df.realDateTime) ).show(truncate=False)
当然我的更新函数不是你想要的,所以你必须改变它,但它会完成工作(如果所有间隔的模式都相同,你不需要 elifs,你可以做到动态)
这是为任何有更好答案的人创建数据框的代码
data = [ (1,'2019-12-13 10:00:00'),
(2, '2019-12-13 10:00:00'),
(3, '2019-12-13 10:00:00'),
(1, '2019-12-13 11:00:00'),
(2, '2019-12-13 11:00:00'),
(3, '2019-12-13 11:00:00')]
df = sqlContext.createDataFrame(data, ['interval','realDateTime']).cache()
感谢 Ali Yesilli 的 post,我找到了解决方案
.
我首先将我的 ReadDateTime 转换为 unix 时间戳,并且仅当我的间隔不等于 1 时才向其添加 5 分钟。所以我的代码如下所示。
from pyspark.sql.functions import *
df = df.withColumn("ReadDateTime1", when(col("Interval") != lit(1),
col("ReadDateTime") +
(col("Interval")*expr("Interval 5 minutes"))).otherwise(col('ReadDateTime')))
我有一个 pyspark 数据框
+----------+----------+---------------------+
| Activity | Interval | ReadDateTime |
+----------+----------+---------------------+
| A | 1 | 2019-12-13 10:00:00 |
| A | 2 | 2019-12-13 10:00:00 |
| A | 3 | 2019-12-13 10:00:00 |
| B | 1 | 2019-12-13 11:00:00 |
| B | 2 | 2019-12-13 11:00:00 |
| B | 3 | 2019-12-13 11:00:00 |
+--------- +----------+---------------------+
现在我必须根据前一行中的值向 ReadDateTime 列添加 5 分钟。我预期的数据框如下所示
+----------+----------+---------------------+
| Activity | Interval | ReadDateTime |
+----------+----------+---------------------+
| A | 1 | 2019-12-13 10:00:00 |
| A | 2 | 2019-12-13 10:05:00 |
| A | 3 | 2019-12-13 10:10:00 |
| B | 1 | 2019-12-13 11:00:00 |
| B | 2 | 2019-12-13 11:05:00 |
| B | 3 | 2019-12-13 11:10:00 |
+--------- +----------+---------------------+
我不会向对应于间隔 1 的 ReadDateTime 列添加 5 分钟,而我会继续向其他行添加 5 分钟,直到我的 activity 更改
有一个丑陋的方法
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
def update(interval,date):
if (interval == 1):
return date
elif (interval == 2):
return date + 'add 5 min'
elif (interval == 3):
return date + 'add 10 min'
#df.dtypes
my_udf = udf(lambda x,y: update(x,y), StringType())
df.withColumn('updated_realDateTime', my_udf(df.interval, df.realDateTime) ).show(truncate=False)
当然我的更新函数不是你想要的,所以你必须改变它,但它会完成工作(如果所有间隔的模式都相同,你不需要 elifs,你可以做到动态)
这是为任何有更好答案的人创建数据框的代码
data = [ (1,'2019-12-13 10:00:00'),
(2, '2019-12-13 10:00:00'),
(3, '2019-12-13 10:00:00'),
(1, '2019-12-13 11:00:00'),
(2, '2019-12-13 11:00:00'),
(3, '2019-12-13 11:00:00')]
df = sqlContext.createDataFrame(data, ['interval','realDateTime']).cache()
感谢 Ali Yesilli 的 post,我找到了解决方案
我首先将我的 ReadDateTime 转换为 unix 时间戳,并且仅当我的间隔不等于 1 时才向其添加 5 分钟。所以我的代码如下所示。
from pyspark.sql.functions import *
df = df.withColumn("ReadDateTime1", when(col("Interval") != lit(1),
col("ReadDateTime") +
(col("Interval")*expr("Interval 5 minutes"))).otherwise(col('ReadDateTime')))