Spark SQL 数据框:跨行计算的最佳方式
Spark SQL dataframe: best way to compute across rowpairs
我有一个 Spark DataFrame "deviceDF" 像这样:
ID date_time state
a 2015-12-11 4:30:00 up
a 2015-12-11 5:00:00 down
a 2015-12-11 5:15:00 up
b 2015-12-12 4:00:00 down
b 2015-12-12 4:20:00 up
a 2015-12-12 10:15:00 down
a 2015-12-12 10:20:00 up
b 2015-12-14 15:30:00 down
我正在尝试计算每个 ID 的停机时间。我从基于 id 的分组开始,然后分别计算所有正常运行时间和停机时间的总和。然后取正常运行时间和停机时间之和。
val downtimeDF = deviceDF.filter($"state" === "down")
.groupBy("ID")
.agg(sum(unix_timestamp($"date_time")) as "down_time")
val uptimeDF = deviceDF.filter($"state" === "up")
.groupBy("ID")
.agg(sum(unix_timestamp($"date_time")) as "up_time")
val updownjoinDF = uptimeDF.join(downtimeDF, "ID")
val difftimeDF = updownjoinDF
.withColumn("diff_time", $"up_time" - $"down_time")
然而,导致错误的条件很少,例如设备出现故障但从未恢复,在这种情况下,down_time 是 current_time 和 [=26= 之间的差异】 下来了。
此外,如果特定设备的第一个条目以 'up' 开头,则 down_time 是 first_entry 与此分析开始时的时间之差,例如 2015- 12-1100:00:00。使用数据框处理这些边界条件的最佳方法是什么?我需要编写自定义 UDAF 吗?
您可以尝试的第一件事是使用 window 函数。虽然这通常不是最快的解决方案,但它简洁且极具表现力。以你的数据为例:
import org.apache.spark.sql.functions.unix_timestamp
val df = sc.parallelize(Array(
("a", "2015-12-11 04:30:00", "up"), ("a", "2015-12-11 05:00:00", "down"),
("a", "2015-12-11 05:15:00", "up"), ("b", "2015-12-12 04:00:00", "down"),
("b", "2015-12-12 04:20:00", "up"), ("a", "2015-12-12 10:15:00", "down"),
("a", "2015-12-12 10:20:00", "up"), ("b", "2015-12-14 15:30:00", "down")))
.toDF("ID", "date_time", "state")
.withColumn("timestamp", unix_timestamp($"date_time"))
让我们定义示例 window:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, lag, when, sum}
val w = Window.partitionBy($"ID").orderBy($"timestamp")
一些辅助列
val previousTimestamp = coalesce(lag($"timestamp", 1).over(w), $"timestamp")
val previousState = coalesce(lag($"state", 1).over(w), $"state")
val downtime = when(
previousState === "down",
$"timestamp" - previousTimestamp
).otherwise(0).alias("downtime")
val uptime = when(
previousState === "up",
$"timestamp" - previousTimestamp
).otherwise(0).alias("uptime")
最后是一个基本查询:
val upsAndDowns = df.select($"*", uptime, downtime)
upsAndDowns.show
// +---+-------------------+-----+----------+------+--------+
// | ID| date_time|state| timestamp|uptime|downtime|
// +---+-------------------+-----+----------+------+--------+
// | a|2015-12-11 04:30:00| up|1449804600| 0| 0|
// | a|2015-12-11 05:00:00| down|1449806400| 1800| 0|
// | a|2015-12-11 05:15:00| up|1449807300| 0| 900|
// | a|2015-12-12 10:15:00| down|1449911700|104400| 0|
// | a|2015-12-12 10:20:00| up|1449912000| 0| 300|
// | b|2015-12-12 04:00:00| down|1449889200| 0| 0|
// | b|2015-12-12 04:20:00| up|1449890400| 0| 1200|
// | b|2015-12-14 15:30:00| down|1450103400|213000| 0|
// +---+-------------------+-----+----------+------+--------+
以类似的方式,您可以向前看,如果组中没有更多记录,您可以使用当前时间戳调整总数 uptime
/ downtime
。
Window 函数提供了一些其他有用的功能,例如 window 带有 ROWS BETWEEN
和 RANGE BETWEEN
子句的定义。
另一种可能的解决方案是将数据移动到 RDD 并使用 RangePartitioner
、mapPartitions
和滑动 windows 的低级操作。对于基本的东西,你甚至可以 groupBy
。这需要更多的努力,但也更加灵活。
最后有一个来自 Cloudera 的 spark-timeseries
包。文档接近 non-existent,但测试足够全面,可以让您了解如何使用它。
关于自定义 UDAF,我不会乐观。 UDAF API 相当具体而且不够灵活。
我有一个 Spark DataFrame "deviceDF" 像这样:
ID date_time state
a 2015-12-11 4:30:00 up
a 2015-12-11 5:00:00 down
a 2015-12-11 5:15:00 up
b 2015-12-12 4:00:00 down
b 2015-12-12 4:20:00 up
a 2015-12-12 10:15:00 down
a 2015-12-12 10:20:00 up
b 2015-12-14 15:30:00 down
我正在尝试计算每个 ID 的停机时间。我从基于 id 的分组开始,然后分别计算所有正常运行时间和停机时间的总和。然后取正常运行时间和停机时间之和。
val downtimeDF = deviceDF.filter($"state" === "down")
.groupBy("ID")
.agg(sum(unix_timestamp($"date_time")) as "down_time")
val uptimeDF = deviceDF.filter($"state" === "up")
.groupBy("ID")
.agg(sum(unix_timestamp($"date_time")) as "up_time")
val updownjoinDF = uptimeDF.join(downtimeDF, "ID")
val difftimeDF = updownjoinDF
.withColumn("diff_time", $"up_time" - $"down_time")
然而,导致错误的条件很少,例如设备出现故障但从未恢复,在这种情况下,down_time 是 current_time 和 [=26= 之间的差异】 下来了。
此外,如果特定设备的第一个条目以 'up' 开头,则 down_time 是 first_entry 与此分析开始时的时间之差,例如 2015- 12-1100:00:00。使用数据框处理这些边界条件的最佳方法是什么?我需要编写自定义 UDAF 吗?
您可以尝试的第一件事是使用 window 函数。虽然这通常不是最快的解决方案,但它简洁且极具表现力。以你的数据为例:
import org.apache.spark.sql.functions.unix_timestamp
val df = sc.parallelize(Array(
("a", "2015-12-11 04:30:00", "up"), ("a", "2015-12-11 05:00:00", "down"),
("a", "2015-12-11 05:15:00", "up"), ("b", "2015-12-12 04:00:00", "down"),
("b", "2015-12-12 04:20:00", "up"), ("a", "2015-12-12 10:15:00", "down"),
("a", "2015-12-12 10:20:00", "up"), ("b", "2015-12-14 15:30:00", "down")))
.toDF("ID", "date_time", "state")
.withColumn("timestamp", unix_timestamp($"date_time"))
让我们定义示例 window:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, lag, when, sum}
val w = Window.partitionBy($"ID").orderBy($"timestamp")
一些辅助列
val previousTimestamp = coalesce(lag($"timestamp", 1).over(w), $"timestamp")
val previousState = coalesce(lag($"state", 1).over(w), $"state")
val downtime = when(
previousState === "down",
$"timestamp" - previousTimestamp
).otherwise(0).alias("downtime")
val uptime = when(
previousState === "up",
$"timestamp" - previousTimestamp
).otherwise(0).alias("uptime")
最后是一个基本查询:
val upsAndDowns = df.select($"*", uptime, downtime)
upsAndDowns.show
// +---+-------------------+-----+----------+------+--------+
// | ID| date_time|state| timestamp|uptime|downtime|
// +---+-------------------+-----+----------+------+--------+
// | a|2015-12-11 04:30:00| up|1449804600| 0| 0|
// | a|2015-12-11 05:00:00| down|1449806400| 1800| 0|
// | a|2015-12-11 05:15:00| up|1449807300| 0| 900|
// | a|2015-12-12 10:15:00| down|1449911700|104400| 0|
// | a|2015-12-12 10:20:00| up|1449912000| 0| 300|
// | b|2015-12-12 04:00:00| down|1449889200| 0| 0|
// | b|2015-12-12 04:20:00| up|1449890400| 0| 1200|
// | b|2015-12-14 15:30:00| down|1450103400|213000| 0|
// +---+-------------------+-----+----------+------+--------+
以类似的方式,您可以向前看,如果组中没有更多记录,您可以使用当前时间戳调整总数 uptime
/ downtime
。
Window 函数提供了一些其他有用的功能,例如 window 带有 ROWS BETWEEN
和 RANGE BETWEEN
子句的定义。
另一种可能的解决方案是将数据移动到 RDD 并使用 RangePartitioner
、mapPartitions
和滑动 windows 的低级操作。对于基本的东西,你甚至可以 groupBy
。这需要更多的努力,但也更加灵活。
最后有一个来自 Cloudera 的 spark-timeseries
包。文档接近 non-existent,但测试足够全面,可以让您了解如何使用它。
关于自定义 UDAF,我不会乐观。 UDAF API 相当具体而且不够灵活。