在 Spark Streaming 中使用 updateStateByKey() 从原始事件流中生成状态变化流
Usage of updateStateByKey() in Spark Streaming to produce a stream of state changes from a stream of raw events
当我遇到 updateStateByKey() 函数时,我才开始四处寻找使用 Spark Streaming 进行有状态计算的解决方案。
我要解决的问题:
10,000 个传感器每分钟产生一个二进制值。
如果传感器报告的连续值彼此不同,我想标记它并将其作为状态更改事件发送到 Kafka。
我的假设是可以在此示例中使用 updateStateByKey(),但是我并不完全了解实现相同方法的推荐方法。
我假设您将从传感器获得 (String, Int) 对流,其中 String 是传感器的 ID,Int 是传感器返回的二进制值。有了这个假设,你可以尝试这样的事情:
val sensorData: DStream[(String, Int)] = ???
val state = sensorData.updateStateByKey[(String, Int)](updateFunction _)
def updateFunction(newValues: Seq[(String, Int)], currentValues: Seq[(String, Int)]) = {
val newValuesMap = newValues.toMap
val currentValuesMap = currentValues.toMap
currentValuesMap.keys.foreach ( (id) =>
if(currrentValuesMap.get(id) != newValuesMap.getOrElse(id, -1)) {
//send to Kafka
}
)
Some(newValues)
}
有什么办法可以管理国家的life/resource?还是一直在增长?
例如在 this sessionization example 中,状态将永远增长,对吗?有什么方法可以管理它,这样你就可以 purge/archive 仅 3 个月的聚合数据或其他什么?
当我遇到 updateStateByKey() 函数时,我才开始四处寻找使用 Spark Streaming 进行有状态计算的解决方案。
我要解决的问题: 10,000 个传感器每分钟产生一个二进制值。
如果传感器报告的连续值彼此不同,我想标记它并将其作为状态更改事件发送到 Kafka。
我的假设是可以在此示例中使用 updateStateByKey(),但是我并不完全了解实现相同方法的推荐方法。
我假设您将从传感器获得 (String, Int) 对流,其中 String 是传感器的 ID,Int 是传感器返回的二进制值。有了这个假设,你可以尝试这样的事情:
val sensorData: DStream[(String, Int)] = ???
val state = sensorData.updateStateByKey[(String, Int)](updateFunction _)
def updateFunction(newValues: Seq[(String, Int)], currentValues: Seq[(String, Int)]) = {
val newValuesMap = newValues.toMap
val currentValuesMap = currentValues.toMap
currentValuesMap.keys.foreach ( (id) =>
if(currrentValuesMap.get(id) != newValuesMap.getOrElse(id, -1)) {
//send to Kafka
}
)
Some(newValues)
}
有什么办法可以管理国家的life/resource?还是一直在增长?
例如在 this sessionization example 中,状态将永远增长,对吗?有什么方法可以管理它,这样你就可以 purge/archive 仅 3 个月的聚合数据或其他什么?