Azure 流分析:每小时压缩传感器数据?

Azure Stream Analytics: Compact sensor data per hour?

如何将传感器事件收集到包含原始消息字段子集的数组的每小时文档中:

传入事件具有以下格式:

{"plantId": "Plant A", "machineId" : "M001", "sensorId": "S001", "unit": "kg", "time": "2017-09-05T22:00:14.9410000Z", "value": 1234.56}

{"plantId": "Plant A", "machineId" : "M001", "sensorId": "S001", "unit": "kg", "time": "2017-09-05T22:00:19.5410000Z", "value": 1334.76}

...

我想每小时为每个传感器获取以下输出:

{"plantId": "Plant A", "machineId" : "M001", "sensorId": "S001", "unit": "kg",

  "from" : "2017-09-05T22:00:14.9410000Z", "to" : "2017-09-05T22:59:55.5410000Z",

  "datat": [

    {"time": "2017-09-05T22:01:14.9410000Z", "value": 1234.56},

    {"time": "2017-09-05T22:01:19.5410000Z", "value": 1334.76},

    ....

  ]

}

我创建以下查询:

SELECT  PlantId, MachineId, SensorId, Unit, 
        MIN(Time) AS From, MAX(Time) AS To, 
        Collect() AS Data
INTO CosmosDBOutput
FROM SensorsInput TIMESTAMP BY CAST(time as datetime)
GROUP BY PlantId, MachineId, SensorId, Unit, TumblingWindow(hour,1)

问题在于收集 returns 所有起源事件的完整数组。但我希望其中只有时间和值字段。

如何将 Collect() 结果缩减为该字段?

根据你的描述,我建议你可以考虑使用JavaScript user-defined functions

您可以定义一个自定义函数来删除无用的值。

更多详情,您可以参考以下步骤:

1.Create 一个 UDF:

2.Add 下面是函数

的代码
// Sample UDF which returns sum of two values.
function main(InputJSON) {
     for (i = 0; i < InputJSON.length; i++) {
       delete InputJSON[i].plantId;
       delete InputJSON[i].machineId;
       delete InputJSON[i].sensorId;
       delete InputJSON[i].unit;
    }
     return InputJSON;
}

3.Change查询:

注意:将 UDF.remove 替换为您的 UDF 名称。(UDF.yourUDFname)

SELECT
    PlantId, MachineId, SensorId, Unit,UDF.remove(Collect()) AS Data,min(time) as fromdate,max(time) as todate
INTO
    [YourOutputAlias]
FROM
    [YourInputAlias] TIMESTAMP BY time
 GROUP BY PlantId, MachineId, SensorId, Unit, TumblingWindow(hour,1)

结果: