如果传感器读数自上次事件以来未发生变化,但未发送传感器读数,如何计算 window 上的聚合?
how to calculate aggregations on a window when sensor readings are not sent if they haven't changed since last event?
当仅当传感器值自上次事件发生变化时才发送新事件时,如何从传感器计算 window 上的聚合?传感器读数在固定时间获取,例如每 5 秒一次,但仅当自上次读数以来读数发生变化时才会转发。
因此,如果我想为每台设备创建 signal_stength 的平均值:
eventsDF = ...
avgSignalDF = eventsDF.groupBy("deviceId").avg("signal_strength")
例如设备发送一分钟的事件window:
event_time device_id signal_strength
12:00:00 1 5
12:00:05 1 4
12:00:30 1 5
12:00:45 1 6
12:00:55 1 5
填充了实际未发送的事件的同一数据集:
event_time device_id signal_strength
12:00:00 1 5
12:00:05 1 4
12:00:10 1 4
12:00:15 1 4
12:00:20 1 4
12:00:25 1 4
12:00:30 1 5
12:00:35 1 5
12:00:40 1 5
12:00:45 1 6
12:00:50 1 6
12:00:55 1 5
signal_strength sum
是 57
而 avg
是 57/12
如何通过 spark 结构化流推断出这些缺失数据,并根据推断值计算出平均值?
注意:我使用平均值作为聚合的示例,但解决方案需要适用于任何聚合函数。
已编辑:
我修改了逻辑,仅根据过滤后的 dataframe
计算平均值,以弥补差距。
//input structure
case class StreamInput(event_time: Long, device_id: Int, signal_strength: Int)
//columns for which we want to maintain state
case class StreamState(prevSum: Int, prevRowCount: Int, prevTime: Long, prevSignalStrength: Int, currentTime: Long, totalRow: Int, totalSum: Int, avg: Double)
//final result structure
case class StreamResult(event_time: Long, device_id: Int, signal_strength: Int, avg: Double)
val filteredDF = ??? //get input(filtered rows only)
val interval = 5 // event_time interval
// using .mapGroupsWithState to maintain state for runningSum & total row count till now
// you need to set the timeout threshold to indicate how long you wish to maintain the state
val avgDF = filteredDF.groupByKey(_.device_id)
.mapGroupsWithState[StreamState, StreamResult](GroupStateTimeout.NoTimeout()) {
case (id: Int, eventIter: Iterator[StreamInput], state: GroupState[StreamState]) => {
val events = eventIter.toSeq
val updatedSession = if (state.exists) {
//if state exists update the state with the new values
val existingState = state.get
val prevTime = existingState.currentTime
val currentTime = events.map(x => x.event_time).last
val currentRowCount = (currentTime - prevTime)/interval
val rowCount = existingState.rowCount + currentRowCount.toInt
val currentSignalStength = events.map(x => x.signal_strength).last
val total_signal_strength = currentSignalStength +
(existingState.prevSignalStrength * (currentRowCount -1)) +
existingState.total_signal_strength
StreamState(
existingState.total_signal_strength,
existingState.rowCount,
prevTime,
currentSignalStength,
currentTime,
rowCount,
total_signal_strength.toInt,
total_signal_strength/rowCount.toDouble
)
} else {
// if there are no earlier state
val runningSum = events.map(x => x.signal_strength).sum
val size = events.size.toDouble
val currentTime = events.map(x => x.event_time).last
StreamState(0, 1, 0, runningSum, currentTime, 1, runningSum, runningSum/size)
}
//save the updated state
state.update(updatedSession)
StreamResult(
events.map(x => x.event_time).last,
id,
events.map(x => x.signal_strength).last,
updatedSession.avg
)
}
}
val result = avgDF
.writeStream
.outputMode(OutputMode.Update())
.format("console")
.start
想法是计算两个新列:
- totalRowCount: 运行 如果您没有过滤,应该存在的总行数。
- total_signal_strength: 到目前为止
signal_strength
的 运行 总数。 (这也包括错过的行总数)。
计算公式:
total_signal_strength =
current row's signal_strength +
(total_signal_strength of previous row * (rowCount -1)) +
//rowCount is the count of missed rows computed by comparing previous and current event_time.
previous total_signal_strength
中间状态的格式:
+----------+---------+---------------+---------------------+--------+
|event_time|device_id|signal_strength|total_signal_strength|rowCount|
+----------+---------+---------------+---------------------+--------+
| 0| 1| 5| 5| 1|
| 5| 1| 4| 9| 2|
| 30| 1| 5| 30| 7|
| 45| 1| 6| 46| 10|
| 55| 1| 5| 57| 12|
+----------+---------+---------------+---------------------+--------+
最终输出:
+----------+---------+---------------+-----------------+
|event_time|device_id|signal_strength| avg|
+----------+---------+---------------+-----------------+
| 0| 1| 5| 5.0|
| 5| 1| 4| 4.5|
| 30| 1| 5|4.285714285714286|
| 45| 1| 6| 4.6|
| 55| 1| 5| 4.75|
+----------+---------+---------------+-----------------+
在数学上等同于基于持续时间的加权平均问题:
avg=(signal_strength*duration)/60
这里的挑战是获取每个信号的持续时间,这里的一个选项是针对每个微批次,在驱动程序中收集结果然后这就是所有统计问题,要获取持续时间你可以在开始时间左移然后减去,像这样:
window.start.leftShift(1)-window.start
这会给你:
event_time device_id signal_strength duration
12:00:00 1 5 5(5-0)
12:00:05 1 4 25(30-5)
12:00:30 1 5 15(45-30)
12:00:45 1 6 10(55-45)
12:00:55 1 5 5 (60-55)
(5*5+4*25+5*15+6*10+5*5)/60=57/12
从 Spark structured streaming 2.3.2 开始,您需要编写自己的自定义接收器以将每个阶段的结果收集到驱动程序并像那样进行数学计算。
当仅当传感器值自上次事件发生变化时才发送新事件时,如何从传感器计算 window 上的聚合?传感器读数在固定时间获取,例如每 5 秒一次,但仅当自上次读数以来读数发生变化时才会转发。
因此,如果我想为每台设备创建 signal_stength 的平均值:
eventsDF = ...
avgSignalDF = eventsDF.groupBy("deviceId").avg("signal_strength")
例如设备发送一分钟的事件window:
event_time device_id signal_strength
12:00:00 1 5
12:00:05 1 4
12:00:30 1 5
12:00:45 1 6
12:00:55 1 5
填充了实际未发送的事件的同一数据集:
event_time device_id signal_strength
12:00:00 1 5
12:00:05 1 4
12:00:10 1 4
12:00:15 1 4
12:00:20 1 4
12:00:25 1 4
12:00:30 1 5
12:00:35 1 5
12:00:40 1 5
12:00:45 1 6
12:00:50 1 6
12:00:55 1 5
signal_strength sum
是 57
而 avg
是 57/12
如何通过 spark 结构化流推断出这些缺失数据,并根据推断值计算出平均值?
注意:我使用平均值作为聚合的示例,但解决方案需要适用于任何聚合函数。
已编辑:
我修改了逻辑,仅根据过滤后的 dataframe
计算平均值,以弥补差距。
//input structure
case class StreamInput(event_time: Long, device_id: Int, signal_strength: Int)
//columns for which we want to maintain state
case class StreamState(prevSum: Int, prevRowCount: Int, prevTime: Long, prevSignalStrength: Int, currentTime: Long, totalRow: Int, totalSum: Int, avg: Double)
//final result structure
case class StreamResult(event_time: Long, device_id: Int, signal_strength: Int, avg: Double)
val filteredDF = ??? //get input(filtered rows only)
val interval = 5 // event_time interval
// using .mapGroupsWithState to maintain state for runningSum & total row count till now
// you need to set the timeout threshold to indicate how long you wish to maintain the state
val avgDF = filteredDF.groupByKey(_.device_id)
.mapGroupsWithState[StreamState, StreamResult](GroupStateTimeout.NoTimeout()) {
case (id: Int, eventIter: Iterator[StreamInput], state: GroupState[StreamState]) => {
val events = eventIter.toSeq
val updatedSession = if (state.exists) {
//if state exists update the state with the new values
val existingState = state.get
val prevTime = existingState.currentTime
val currentTime = events.map(x => x.event_time).last
val currentRowCount = (currentTime - prevTime)/interval
val rowCount = existingState.rowCount + currentRowCount.toInt
val currentSignalStength = events.map(x => x.signal_strength).last
val total_signal_strength = currentSignalStength +
(existingState.prevSignalStrength * (currentRowCount -1)) +
existingState.total_signal_strength
StreamState(
existingState.total_signal_strength,
existingState.rowCount,
prevTime,
currentSignalStength,
currentTime,
rowCount,
total_signal_strength.toInt,
total_signal_strength/rowCount.toDouble
)
} else {
// if there are no earlier state
val runningSum = events.map(x => x.signal_strength).sum
val size = events.size.toDouble
val currentTime = events.map(x => x.event_time).last
StreamState(0, 1, 0, runningSum, currentTime, 1, runningSum, runningSum/size)
}
//save the updated state
state.update(updatedSession)
StreamResult(
events.map(x => x.event_time).last,
id,
events.map(x => x.signal_strength).last,
updatedSession.avg
)
}
}
val result = avgDF
.writeStream
.outputMode(OutputMode.Update())
.format("console")
.start
想法是计算两个新列:
- totalRowCount: 运行 如果您没有过滤,应该存在的总行数。
- total_signal_strength: 到目前为止
signal_strength
的 运行 总数。 (这也包括错过的行总数)。
计算公式:
total_signal_strength =
current row's signal_strength +
(total_signal_strength of previous row * (rowCount -1)) +
//rowCount is the count of missed rows computed by comparing previous and current event_time.
previous total_signal_strength
中间状态的格式:
+----------+---------+---------------+---------------------+--------+
|event_time|device_id|signal_strength|total_signal_strength|rowCount|
+----------+---------+---------------+---------------------+--------+
| 0| 1| 5| 5| 1|
| 5| 1| 4| 9| 2|
| 30| 1| 5| 30| 7|
| 45| 1| 6| 46| 10|
| 55| 1| 5| 57| 12|
+----------+---------+---------------+---------------------+--------+
最终输出:
+----------+---------+---------------+-----------------+
|event_time|device_id|signal_strength| avg|
+----------+---------+---------------+-----------------+
| 0| 1| 5| 5.0|
| 5| 1| 4| 4.5|
| 30| 1| 5|4.285714285714286|
| 45| 1| 6| 4.6|
| 55| 1| 5| 4.75|
+----------+---------+---------------+-----------------+
在数学上等同于基于持续时间的加权平均问题:
avg=(signal_strength*duration)/60
这里的挑战是获取每个信号的持续时间,这里的一个选项是针对每个微批次,在驱动程序中收集结果然后这就是所有统计问题,要获取持续时间你可以在开始时间左移然后减去,像这样:
window.start.leftShift(1)-window.start
这会给你:
event_time device_id signal_strength duration
12:00:00 1 5 5(5-0)
12:00:05 1 4 25(30-5)
12:00:30 1 5 15(45-30)
12:00:45 1 6 10(55-45)
12:00:55 1 5 5 (60-55)
(5*5+4*25+5*15+6*10+5*5)/60=57/12
从 Spark structured streaming 2.3.2 开始,您需要编写自己的自定义接收器以将每个阶段的结果收集到驱动程序并像那样进行数学计算。