如何使用 MS Stream Analytics 将窗口平均值注入(附加)到输出中

How to inject (append) a windowed average into the output using MS Stream Analytics

给定一个类似于以下内容的 JSON 流:

{ "timestamp": "2017-01-26T20:27:26.099Z", "Novato": { "humidity": "40.996", "barometric": "1011.2" }, "Redmond": { "humidity": "60.832", "barometric": "1011.8" } }

对于这个对象中的每个城市,我想添加一个名为 humidity_5_second_avg 的新值,这是一个 5 秒的翻滚 window 平均值。

当然,对于每个城市,它都需要是唯一的。我想将它附加到现有城市的值中。

例如:

{ "timestamp": "2017-01-26T20:27:26.099Z", "Novato": { "humidity": "40.996", "barometric": "1011.2", "humidity_5_second_avg": "38.1234" }, "Redmond": { "humidity": "60.832", "barometric": "1011.8", "humidity_5_second_avg": "32.1234" } }

流分析查询是否可行?或者我是否需要创建两个流(一个包含原始数据,一个仅包含平均数据,然后将它们合并在一起?

要完全按照所描述的方式进行操作是很棘手的。先把城市信息按城市一行分解,再用JOIN比较容易。

-- Use CROSS APPLY to split original events into one row per city
WITH CityData AS
(
    SELECT 
        r.PropertyName AS City,
        r.PropertyValue.*
    FROM localinput i TIMESTAMP BY timestamp 
    CROSS APPLY GetRecordProperties(i) r
    WHERE r.PropertyValue.humidity IS NOT NULL
),
Averages AS
(
    SELECT 
        City,
        AVG(humidity) as avg_humidity 
    FROM CityData
    GROUP BY city, TumblingWindow(second, 5)
)

SELECT *, System.Timestamp as ts INTO debug FROM Averages

SELECT 
    c.*, a.avg_humidity
FROM CityData c
JOIN Averages a
ON c.City = a.City AND DATEDIFF(second, c, a) BETWEEN 0 AND 5