PySpark - 按 ID 和日期分组,按时间列以分钟为单位求和

PySpark - Group by ID & Date, and Sum in mins by a time column

我正在 Spark 中处理我的数据,问题很相似,可以像我在 SQL 中那样解决: SUM(DATEDIFF(MINUTE, '0:00:00', targetcolumn)

但是,我想知道 PySpark 是否可以这样做,尤其是只有一个时间列?

我的数据框是这样的:

df= df_temp.show()


|record_date| Tag| time|
+-----------+----+-----+
| 2012-05-05| A |13:14:07.000000|
| 2012-05-05| A |13:54:08.000000|
...................
| 2013-01-01| B |14:40:26.000000|
| 2013-01-01| B |14:48:27.000000|
..................
| 2014-04-03| C |17:17:30.000000|
| 2014-04-03| C |17:47:31.000000|

可以吗,我可以按 record_date, Tag 分组 然后以分钟为单位总结时间? 所以它会变成这样:

  |record_date| Tag| time|
    +-----------+----+-----+
    | 2012-05-05| A |00:41:01.000000|
    | 2013-01-01| B |00:08:01.000000|
    | 2014-04-03| C |00:30:01.000000|

时间列可以是任何格式,例如:40 分钟或 0.4 小时。

谢谢

如果只需要比较最近的两行,那么Window可以使用“lead”函数,在Scala上:

val df = Seq(
  ("2012-05-05", "A", "13:14:07.000000"),
  ("2012-05-05", "A", "13:54:08.000000"),
  ("2013-01-01", "B", "14:40:26.000000"),
  ("2013-01-01", "B", "14:48:27.000000"),
  ("2014-04-03", "C", "17:17:30.000000"),
  ("2014-04-03", "C", "17:47:31.000000")
).toDF("record_date", "Tag", "time")

val recordTagWindow = Window.partitionBy("record_date", "Tag").orderBy(desc("time"))

df
  .withColumn("time", substring($"time", 1, 8))
  .withColumn("unixTimestamp", unix_timestamp($"time", "HH:mm:ss"))
  .withColumn("timeDiffSeconds", $"unixTimestamp" - lead($"unixTimestamp", 1, 0).over(recordTagWindow))
  .withColumn("timeDiffFormatted", date_format($"timeDiffSeconds".cast(TimestampType).cast(TimestampType), "HH:mm:ss"))
  .withColumn("rownum", row_number().over(recordTagWindow))
  .where($"rownum" === 1)
  .drop("rownum", "timeDiffSeconds", "time", "unixTimestamp")

输出(第一行看起来像你的例子不正确):

+-----------+---+-----------------+
|record_date|Tag|timeDiffFormatted|
+-----------+---+-----------------+
|2012-05-05 |A  |00:40:01         |
|2013-01-01 |B  |00:08:01         |
|2014-04-03 |C  |00:30:01         |
+-----------+---+-----------------+

对于两行以上,可以使用函数“first”和“last”,并且Window修改为包括所有值(rowsBetween):

val recordTagWindow = Window.partitionBy("record_date", "Tag").orderBy(desc("time"))
  .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df
  .withColumn("time", substring($"time", 1, 8))
  .withColumn("unixTimestamp", unix_timestamp($"time", "HH:mm:ss"))
  .withColumn("timeDiffSeconds", first($"unixTimestamp").over(recordTagWindow) - last($"unixTimestamp").over(recordTagWindow))
  .withColumn("timeDiffFormatted", date_format($"timeDiffSeconds".cast(TimestampType).cast(TimestampType), "HH:mm:ss"))
  .withColumn("rownum", row_number().over(Window.partitionBy("record_date", "Tag").orderBy(desc("time"))))
  .where($"rownum" === 1)
  .drop("rownum", "timeDiffSeconds", "time", "unixTimestamp")
  .show(false)