PySpark - 按 ID 和日期分组,按时间列以分钟为单位求和
PySpark - Group by ID & Date, and Sum in mins by a time column
我正在 Spark 中处理我的数据,问题很相似,可以像我在 SQL 中那样解决:
SUM(DATEDIFF(MINUTE, '0:00:00', targetcolumn)
但是,我想知道 PySpark 是否可以这样做,尤其是只有一个时间列?
我的数据框是这样的:
df= df_temp.show()
|record_date| Tag| time|
+-----------+----+-----+
| 2012-05-05| A |13:14:07.000000|
| 2012-05-05| A |13:54:08.000000|
...................
| 2013-01-01| B |14:40:26.000000|
| 2013-01-01| B |14:48:27.000000|
..................
| 2014-04-03| C |17:17:30.000000|
| 2014-04-03| C |17:47:31.000000|
可以吗,我可以按 record_date, Tag 分组
然后以分钟为单位总结时间?
所以它会变成这样:
|record_date| Tag| time|
+-----------+----+-----+
| 2012-05-05| A |00:41:01.000000|
| 2013-01-01| B |00:08:01.000000|
| 2014-04-03| C |00:30:01.000000|
时间列可以是任何格式,例如:40 分钟或 0.4 小时。
谢谢
如果只需要比较最近的两行,那么Window可以使用“lead”函数,在Scala上:
val df = Seq(
("2012-05-05", "A", "13:14:07.000000"),
("2012-05-05", "A", "13:54:08.000000"),
("2013-01-01", "B", "14:40:26.000000"),
("2013-01-01", "B", "14:48:27.000000"),
("2014-04-03", "C", "17:17:30.000000"),
("2014-04-03", "C", "17:47:31.000000")
).toDF("record_date", "Tag", "time")
val recordTagWindow = Window.partitionBy("record_date", "Tag").orderBy(desc("time"))
df
.withColumn("time", substring($"time", 1, 8))
.withColumn("unixTimestamp", unix_timestamp($"time", "HH:mm:ss"))
.withColumn("timeDiffSeconds", $"unixTimestamp" - lead($"unixTimestamp", 1, 0).over(recordTagWindow))
.withColumn("timeDiffFormatted", date_format($"timeDiffSeconds".cast(TimestampType).cast(TimestampType), "HH:mm:ss"))
.withColumn("rownum", row_number().over(recordTagWindow))
.where($"rownum" === 1)
.drop("rownum", "timeDiffSeconds", "time", "unixTimestamp")
输出(第一行看起来像你的例子不正确):
+-----------+---+-----------------+
|record_date|Tag|timeDiffFormatted|
+-----------+---+-----------------+
|2012-05-05 |A |00:40:01 |
|2013-01-01 |B |00:08:01 |
|2014-04-03 |C |00:30:01 |
+-----------+---+-----------------+
对于两行以上,可以使用函数“first”和“last”,并且Window修改为包括所有值(rowsBetween):
val recordTagWindow = Window.partitionBy("record_date", "Tag").orderBy(desc("time"))
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df
.withColumn("time", substring($"time", 1, 8))
.withColumn("unixTimestamp", unix_timestamp($"time", "HH:mm:ss"))
.withColumn("timeDiffSeconds", first($"unixTimestamp").over(recordTagWindow) - last($"unixTimestamp").over(recordTagWindow))
.withColumn("timeDiffFormatted", date_format($"timeDiffSeconds".cast(TimestampType).cast(TimestampType), "HH:mm:ss"))
.withColumn("rownum", row_number().over(Window.partitionBy("record_date", "Tag").orderBy(desc("time"))))
.where($"rownum" === 1)
.drop("rownum", "timeDiffSeconds", "time", "unixTimestamp")
.show(false)
我正在 Spark 中处理我的数据,问题很相似,可以像我在 SQL 中那样解决: SUM(DATEDIFF(MINUTE, '0:00:00', targetcolumn)
但是,我想知道 PySpark 是否可以这样做,尤其是只有一个时间列?
我的数据框是这样的:
df= df_temp.show()
|record_date| Tag| time|
+-----------+----+-----+
| 2012-05-05| A |13:14:07.000000|
| 2012-05-05| A |13:54:08.000000|
...................
| 2013-01-01| B |14:40:26.000000|
| 2013-01-01| B |14:48:27.000000|
..................
| 2014-04-03| C |17:17:30.000000|
| 2014-04-03| C |17:47:31.000000|
可以吗,我可以按 record_date, Tag 分组 然后以分钟为单位总结时间? 所以它会变成这样:
|record_date| Tag| time|
+-----------+----+-----+
| 2012-05-05| A |00:41:01.000000|
| 2013-01-01| B |00:08:01.000000|
| 2014-04-03| C |00:30:01.000000|
时间列可以是任何格式,例如:40 分钟或 0.4 小时。
谢谢
如果只需要比较最近的两行,那么Window可以使用“lead”函数,在Scala上:
val df = Seq(
("2012-05-05", "A", "13:14:07.000000"),
("2012-05-05", "A", "13:54:08.000000"),
("2013-01-01", "B", "14:40:26.000000"),
("2013-01-01", "B", "14:48:27.000000"),
("2014-04-03", "C", "17:17:30.000000"),
("2014-04-03", "C", "17:47:31.000000")
).toDF("record_date", "Tag", "time")
val recordTagWindow = Window.partitionBy("record_date", "Tag").orderBy(desc("time"))
df
.withColumn("time", substring($"time", 1, 8))
.withColumn("unixTimestamp", unix_timestamp($"time", "HH:mm:ss"))
.withColumn("timeDiffSeconds", $"unixTimestamp" - lead($"unixTimestamp", 1, 0).over(recordTagWindow))
.withColumn("timeDiffFormatted", date_format($"timeDiffSeconds".cast(TimestampType).cast(TimestampType), "HH:mm:ss"))
.withColumn("rownum", row_number().over(recordTagWindow))
.where($"rownum" === 1)
.drop("rownum", "timeDiffSeconds", "time", "unixTimestamp")
输出(第一行看起来像你的例子不正确):
+-----------+---+-----------------+
|record_date|Tag|timeDiffFormatted|
+-----------+---+-----------------+
|2012-05-05 |A |00:40:01 |
|2013-01-01 |B |00:08:01 |
|2014-04-03 |C |00:30:01 |
+-----------+---+-----------------+
对于两行以上,可以使用函数“first”和“last”,并且Window修改为包括所有值(rowsBetween):
val recordTagWindow = Window.partitionBy("record_date", "Tag").orderBy(desc("time"))
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df
.withColumn("time", substring($"time", 1, 8))
.withColumn("unixTimestamp", unix_timestamp($"time", "HH:mm:ss"))
.withColumn("timeDiffSeconds", first($"unixTimestamp").over(recordTagWindow) - last($"unixTimestamp").over(recordTagWindow))
.withColumn("timeDiffFormatted", date_format($"timeDiffSeconds".cast(TimestampType).cast(TimestampType), "HH:mm:ss"))
.withColumn("rownum", row_number().over(Window.partitionBy("record_date", "Tag").orderBy(desc("time"))))
.where($"rownum" === 1)
.drop("rownum", "timeDiffSeconds", "time", "unixTimestamp")
.show(false)