如何使用 MS Stream Analytics 将窗口平均值注入(附加)到输出中
How to inject (append) a windowed average into the output using MS Stream Analytics
给定一个类似于以下内容的 JSON 流:
{
"timestamp": "2017-01-26T20:27:26.099Z",
"Novato": {
"humidity": "40.996",
"barometric": "1011.2"
},
"Redmond": {
"humidity": "60.832",
"barometric": "1011.8"
}
}
对于这个对象中的每个城市,我想添加一个名为 humidity_5_second_avg 的新值,这是一个 5 秒的翻滚 window 平均值。
当然,对于每个城市,它都需要是唯一的。我想将它附加到现有城市的值中。
例如:
{
"timestamp": "2017-01-26T20:27:26.099Z",
"Novato": {
"humidity": "40.996",
"barometric": "1011.2",
"humidity_5_second_avg": "38.1234"
},
"Redmond": {
"humidity": "60.832",
"barometric": "1011.8",
"humidity_5_second_avg": "32.1234"
}
}
流分析查询是否可行?或者我是否需要创建两个流(一个包含原始数据,一个仅包含平均数据,然后将它们合并在一起?
要完全按照所描述的方式进行操作是很棘手的。先把城市信息按城市一行分解,再用JOIN比较容易。
-- Use CROSS APPLY to split original events into one row per city
WITH CityData AS
(
SELECT
r.PropertyName AS City,
r.PropertyValue.*
FROM localinput i TIMESTAMP BY timestamp
CROSS APPLY GetRecordProperties(i) r
WHERE r.PropertyValue.humidity IS NOT NULL
),
Averages AS
(
SELECT
City,
AVG(humidity) as avg_humidity
FROM CityData
GROUP BY city, TumblingWindow(second, 5)
)
SELECT *, System.Timestamp as ts INTO debug FROM Averages
SELECT
c.*, a.avg_humidity
FROM CityData c
JOIN Averages a
ON c.City = a.City AND DATEDIFF(second, c, a) BETWEEN 0 AND 5
给定一个类似于以下内容的 JSON 流:
{
"timestamp": "2017-01-26T20:27:26.099Z",
"Novato": {
"humidity": "40.996",
"barometric": "1011.2"
},
"Redmond": {
"humidity": "60.832",
"barometric": "1011.8"
}
}
对于这个对象中的每个城市,我想添加一个名为 humidity_5_second_avg 的新值,这是一个 5 秒的翻滚 window 平均值。
当然,对于每个城市,它都需要是唯一的。我想将它附加到现有城市的值中。
例如:
{
"timestamp": "2017-01-26T20:27:26.099Z",
"Novato": {
"humidity": "40.996",
"barometric": "1011.2",
"humidity_5_second_avg": "38.1234"
},
"Redmond": {
"humidity": "60.832",
"barometric": "1011.8",
"humidity_5_second_avg": "32.1234"
}
}
流分析查询是否可行?或者我是否需要创建两个流(一个包含原始数据,一个仅包含平均数据,然后将它们合并在一起?
要完全按照所描述的方式进行操作是很棘手的。先把城市信息按城市一行分解,再用JOIN比较容易。
-- Use CROSS APPLY to split original events into one row per city
WITH CityData AS
(
SELECT
r.PropertyName AS City,
r.PropertyValue.*
FROM localinput i TIMESTAMP BY timestamp
CROSS APPLY GetRecordProperties(i) r
WHERE r.PropertyValue.humidity IS NOT NULL
),
Averages AS
(
SELECT
City,
AVG(humidity) as avg_humidity
FROM CityData
GROUP BY city, TumblingWindow(second, 5)
)
SELECT *, System.Timestamp as ts INTO debug FROM Averages
SELECT
c.*, a.avg_humidity
FROM CityData c
JOIN Averages a
ON c.City = a.City AND DATEDIFF(second, c, a) BETWEEN 0 AND 5