Azure 流分析有状态聚合
Azure Streaming Analytics stateful aggregation
我有一个 IoT 中心,其中包含多个将数据发送到流分析的设备。来自设备的消息包含有关其健康状况的信息(介于 0 和 1 之间)。流分析将数据输出到服务总线,我想添加有关包含给定时刻跨设备平均运行状况的字段的信息。
我想使用用户定义的聚合每 10 秒生成一次此值,但看起来它只使用时间范围内的最后一条消息。
我使用 UDA 是否正确?如果没有,是否有任何其他方法可以跨多个设备或其他一些有状态函数求平均值?
UDA 代码:
function main() {
this.init = function () {
this.state = {};
}
this.accumulate = function (value, device_id) {
this.state[device_id] = value;
}
/*this.deaccumulate = function (value, timestamp) {
this.state -= value;
}
this.deaccumulateState = function (otherState) {
this.state -= otherState.state;
}*/
this.computeResult = function () {
length = 0,
total = 0;
for (var device in this.state) {
total += this.state[device];
length++;
}
return total/length;
}
}
查询:
SELECT
uda.fleetHealth(device_health_status.level, device_id) as avg_health
INTO
bustopic2
FROM
iotdata
GROUP BY TumblingWindow(second, 10)
您只能获取最后一条消息,因为您在 Java 脚本中使用地图 1。 2 第二个参数始终相同且等于应用程序时间戳,即使您将其定义为 device_id。
如果你想计算所有设备的平均水平,你应该这样做:
function UDASample() {
this.init = function () {
this.state = 0;
this.length = 0;
}
this.accumulate = function (value, timestamp) {
this.state += value;
this.length = length + 1;
}
/*this.deaccumulate = function (value, timestamp) {
this.state -= value;
}
this.deaccumulateState = function (otherState) {
this.state -= otherState.state;
}*/
this.computeResult = function () {
return this.state/this.length;
}
}
SELECT
uda.fleetHealth(device_health_status.level) as avg_health
INTO
bustopic2
FROM
iotdata
GROUP BY TumblingWindow(second, 10)
如果你想统计每个设备的平均水平,你可以使用上面相同的 UDA 并使用这样的脚本:
SELECT device_id,
uda.fleetHealth(device_health_status.level) as avg_health
INTO
bustopic2
FROM
iotdata
GROUP BY TumblingWindow(second, 10), device_id
我有一个 IoT 中心,其中包含多个将数据发送到流分析的设备。来自设备的消息包含有关其健康状况的信息(介于 0 和 1 之间)。流分析将数据输出到服务总线,我想添加有关包含给定时刻跨设备平均运行状况的字段的信息。
我想使用用户定义的聚合每 10 秒生成一次此值,但看起来它只使用时间范围内的最后一条消息。
我使用 UDA 是否正确?如果没有,是否有任何其他方法可以跨多个设备或其他一些有状态函数求平均值?
UDA 代码:
function main() {
this.init = function () {
this.state = {};
}
this.accumulate = function (value, device_id) {
this.state[device_id] = value;
}
/*this.deaccumulate = function (value, timestamp) {
this.state -= value;
}
this.deaccumulateState = function (otherState) {
this.state -= otherState.state;
}*/
this.computeResult = function () {
length = 0,
total = 0;
for (var device in this.state) {
total += this.state[device];
length++;
}
return total/length;
}
}
查询:
SELECT
uda.fleetHealth(device_health_status.level, device_id) as avg_health
INTO
bustopic2
FROM
iotdata
GROUP BY TumblingWindow(second, 10)
您只能获取最后一条消息,因为您在 Java 脚本中使用地图 1。 2 第二个参数始终相同且等于应用程序时间戳,即使您将其定义为 device_id。 如果你想计算所有设备的平均水平,你应该这样做:
function UDASample() {
this.init = function () {
this.state = 0;
this.length = 0;
}
this.accumulate = function (value, timestamp) {
this.state += value;
this.length = length + 1;
}
/*this.deaccumulate = function (value, timestamp) {
this.state -= value;
}
this.deaccumulateState = function (otherState) {
this.state -= otherState.state;
}*/
this.computeResult = function () {
return this.state/this.length;
}
}
SELECT
uda.fleetHealth(device_health_status.level) as avg_health
INTO
bustopic2
FROM
iotdata
GROUP BY TumblingWindow(second, 10)
如果你想统计每个设备的平均水平,你可以使用上面相同的 UDA 并使用这样的脚本:
SELECT device_id,
uda.fleetHealth(device_health_status.level) as avg_health
INTO
bustopic2
FROM
iotdata
GROUP BY TumblingWindow(second, 10), device_id