SA延迟事件的过程
process of late events in SA
我正在做一个测试,当时我生成了 30 天前的数据。
当发送到 SA 作业时,所有输入都被丢弃,但根据 事件排序 blade 中的设置,我期望所有输入都将通过。
部分职位查询包含:
---------------all incoming events storage query
SELECT stream.*
INTO [iot-predict-SA2-ColdStorage]
FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
所以我的期望是将推送到 SA 作业的所有内容都存储在 blob 存储中。
当我发送仅 5 小时前的事件时 - 输入被标记为延迟(预期)并已处理。
每个 SS 第一个标记区域显示过时的事件输入,但没有输出(红色),第二部分显示延迟处理的事件。
完整查询
WITH AlertsBasedOnMin
AS (
SELECT stream.SensorGuid
,stream.Value
,stream.SensorName
,ref.AggregationTypeFlag
,ref.MinThreshold AS threshold
,ref.Count
,CASE
WHEN (ref.MinThreshold > stream.Value)
THEN 1
ELSE 0
END AS isAlert
FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = stream.SensorGuid
WHERE ref.AggregationTypeFlag = 8
)
,AlertsBasedOnMax
AS (
SELECT stream.SensorGuid
,stream.Value
,stream.SensorName
,ref.AggregationTypeFlag
,ref.MaxThreshold AS threshold
,ref.Count
,CASE
WHEN (ref.MaxThreshold < stream.Value)
THEN 1
ELSE 0
END AS isAlert
FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = stream.SensorGuid
WHERE ref.AggregationTypeFlag = 16
)
,alertMinMaxUnion
AS (
SELECT *
FROM AlertsBasedOnMin
UNION ALL
SELECT *
FROM AlertsBasedOnMax
)
,alertMimMaxComputed
AS (
SELECT SUM(alertMinMaxUnion.isAlert) AS EventCount
,alertMinMaxUnion.SensorGuid AS SensorGuid
,alertMinMaxUnion.SensorName
FROM alertMinMaxUnion
GROUP BY HoppingWindow(Duration(minute, 1), Hop(second, 30))
,alertMinMaxUnion.SensorGuid
,alertMinMaxUnion.Count
,alertMinMaxUnion.AggregationTypeFlag
,alertMinMaxUnion.SensorName
HAVING SUM(alertMinMaxUnion.isAlert) > alertMinMaxUnion.Count
)
,alertsMimMaxComputedMergedWithReference
AS (
SELECT System.TIMESTAMP [TimeStampUtc]
,computed.EventCount
,0 AS SumValue
,0 AS AvgValue
,0 AS StdDevValue
,computed.SensorGuid
,computed.SensorName
,ref.MinThreshold
,ref.MaxThreshold
,ref.TimeFrameInSeconds
,ref.Count
,ref.GatewayGuid
,ref.SensorType
,ref.AggregationType
,ref.AggregationTypeFlag
,ref.EmailList
,ref.PhoneNumberList
FROM alertMimMaxComputed computed
JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = computed.SensorGuid
)
,alertsAggregatedByFunction
AS (
SELECT Count(1) AS eventCount
,stream.SensorGuid AS SensorGuid
,stream.SensorName
,ref.[Count] AS TriggerThreshold
,SUM(stream.Value) AS SumValue
,AVG(stream.Value) AS AvgValue
,STDEV(stream.Value) AS StdDevValue
,ref.AggregationTypeFlag AS flag
FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = stream.SensorGuid
GROUP BY HoppingWindow(Duration(minute, 1), Hop(second, 30))
,ref.AggregationTypeFlag
,ref.[Count]
,ref.MaxThreshold
,ref.MinThreshold
,stream.SensorGuid
,stream.SensorName
HAVING
--as this is alert then this factor will be relevant to all of the aggregated queries
Count(1) >= ref.[Count]
AND (
--average
(
ref.AggregationTypeFlag = 1
AND (
AVG(stream.Value) >= ref.MaxThreshold
OR AVG(stream.Value) <= ref.MinThreshold
)
)
--sum
OR (
ref.AggregationTypeFlag = 2
AND (
SUM(stream.Value) >= ref.MaxThreshold
OR Sum(stream.Value) <= ref.MinThreshold
)
)
--stdev
OR (
ref.AggregationTypeFlag = 4
AND (
STDEV(stream.Value) >= ref.MaxThreshold
OR STDEV(stream.Value) <= ref.MinThreshold
)
)
)
)
,alertsAggregatedByFunctionMergedWithReference
AS (
SELECT System.TIMESTAMP [TimeStampUtc]
,0 AS EventCount
,computed.SumValue
,computed.AvgValue
,computed.StdDevValue
,computed.SensorGuid
,computed.SensorName
,ref.MinThreshold
,ref.MaxThreshold
,ref.TimeFrameInSeconds
,ref.Count
,ref.GatewayGuid
,ref.SensorType
,ref.AggregationType
,ref.AggregationTypeFlag
,ref.EmailList
,ref.PhoneNumberList
FROM alertsAggregatedByFunction computed
JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = computed.SensorGuid
)
,allAlertsUnioned
AS (
SELECT *
FROM alertsAggregatedByFunctionMergedWithReference
UNION ALL
SELECT *
FROM alertsMimMaxComputedMergedWithReference
)
---------------alerts storage query
SELECT *
INTO [iot-predict-SA2-Alerts-ColdStorage]
FROM allAlertsUnioned
---------------alerts to alert events query
SELECT *
INTO [iot-predict-SA2-Alerts-EventStream]
FROM allAlertsUnioned
---------------alerts to stream query
SELECT *
INTO [iot-predict-SA2-TSI-EventStream]
FROM allAlertsUnioned
---------------all incoming events storage query
SELECT stream.*
INTO [iot-predict-SA2-ColdStorage]
FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
---------------all incoming events to time insights query
SELECT stream.*
INTO [iot-predict-SA2-TSI-AlertStream]
FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
由于您正在使用 "TIMESTAMP BY",流分析作业事件排序设置正在生效。请检查您作业的 "event ordering" 设置,具体如下两个:
- 迟到的事件 -- 迟到限制在 0 秒到 21 天之间。
- 处理其他事件--错误处理策略,删除或调整应用程序时间为系统时钟时间。
我猜想,您的迟到限制很可能超过 5 小时,因此可以处理那些 5 小时前的事件。
您可能已经从上面了解到流分析作业最多只能处理 "old" 延迟 21 天的事件。要解决此限制,您可以考虑以下选项之一:
- 删除 TIMESTAMP BY,然后您的所有窗口聚合都将使用排队时间。根据您的查询逻辑,这可能会生成不正确的结果。
- Select "adjust" 作为错误处理策略。同样,根据您的查询逻辑,这可能会生成不正确的结果。
- 通过使用 DATEADD() 函数将应用程序时间 (stream.UtcTime) 转移到更近的时间,例如 TIMESTAMP BY DATEADD(day, 10, UtcTime)。当这是一次性任务并且您知道事件的时间范围时,此方法很有效。
- 使用批处理作业(在流分析之外)处理 30 天前的数据。
在与 MS 的人聊天后,发现我的测试必须执行额外的步骤。
要处理延迟事件,无论延迟事件设置如何,我们都需要以某种方式启动此作业,延迟事件在作业启动时被视为已发送,因此在这种特殊情况下,我们必须启动 SA使用自定义开始日期的作业并将其设置为 30 天前。
我正在做一个测试,当时我生成了 30 天前的数据。 当发送到 SA 作业时,所有输入都被丢弃,但根据 事件排序 blade 中的设置,我期望所有输入都将通过。
部分职位查询包含:
---------------all incoming events storage query
SELECT stream.*
INTO [iot-predict-SA2-ColdStorage]
FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
所以我的期望是将推送到 SA 作业的所有内容都存储在 blob 存储中。
当我发送仅 5 小时前的事件时 - 输入被标记为延迟(预期)并已处理。 每个 SS 第一个标记区域显示过时的事件输入,但没有输出(红色),第二部分显示延迟处理的事件。
完整查询
WITH AlertsBasedOnMin
AS (
SELECT stream.SensorGuid
,stream.Value
,stream.SensorName
,ref.AggregationTypeFlag
,ref.MinThreshold AS threshold
,ref.Count
,CASE
WHEN (ref.MinThreshold > stream.Value)
THEN 1
ELSE 0
END AS isAlert
FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = stream.SensorGuid
WHERE ref.AggregationTypeFlag = 8
)
,AlertsBasedOnMax
AS (
SELECT stream.SensorGuid
,stream.Value
,stream.SensorName
,ref.AggregationTypeFlag
,ref.MaxThreshold AS threshold
,ref.Count
,CASE
WHEN (ref.MaxThreshold < stream.Value)
THEN 1
ELSE 0
END AS isAlert
FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = stream.SensorGuid
WHERE ref.AggregationTypeFlag = 16
)
,alertMinMaxUnion
AS (
SELECT *
FROM AlertsBasedOnMin
UNION ALL
SELECT *
FROM AlertsBasedOnMax
)
,alertMimMaxComputed
AS (
SELECT SUM(alertMinMaxUnion.isAlert) AS EventCount
,alertMinMaxUnion.SensorGuid AS SensorGuid
,alertMinMaxUnion.SensorName
FROM alertMinMaxUnion
GROUP BY HoppingWindow(Duration(minute, 1), Hop(second, 30))
,alertMinMaxUnion.SensorGuid
,alertMinMaxUnion.Count
,alertMinMaxUnion.AggregationTypeFlag
,alertMinMaxUnion.SensorName
HAVING SUM(alertMinMaxUnion.isAlert) > alertMinMaxUnion.Count
)
,alertsMimMaxComputedMergedWithReference
AS (
SELECT System.TIMESTAMP [TimeStampUtc]
,computed.EventCount
,0 AS SumValue
,0 AS AvgValue
,0 AS StdDevValue
,computed.SensorGuid
,computed.SensorName
,ref.MinThreshold
,ref.MaxThreshold
,ref.TimeFrameInSeconds
,ref.Count
,ref.GatewayGuid
,ref.SensorType
,ref.AggregationType
,ref.AggregationTypeFlag
,ref.EmailList
,ref.PhoneNumberList
FROM alertMimMaxComputed computed
JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = computed.SensorGuid
)
,alertsAggregatedByFunction
AS (
SELECT Count(1) AS eventCount
,stream.SensorGuid AS SensorGuid
,stream.SensorName
,ref.[Count] AS TriggerThreshold
,SUM(stream.Value) AS SumValue
,AVG(stream.Value) AS AvgValue
,STDEV(stream.Value) AS StdDevValue
,ref.AggregationTypeFlag AS flag
FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = stream.SensorGuid
GROUP BY HoppingWindow(Duration(minute, 1), Hop(second, 30))
,ref.AggregationTypeFlag
,ref.[Count]
,ref.MaxThreshold
,ref.MinThreshold
,stream.SensorGuid
,stream.SensorName
HAVING
--as this is alert then this factor will be relevant to all of the aggregated queries
Count(1) >= ref.[Count]
AND (
--average
(
ref.AggregationTypeFlag = 1
AND (
AVG(stream.Value) >= ref.MaxThreshold
OR AVG(stream.Value) <= ref.MinThreshold
)
)
--sum
OR (
ref.AggregationTypeFlag = 2
AND (
SUM(stream.Value) >= ref.MaxThreshold
OR Sum(stream.Value) <= ref.MinThreshold
)
)
--stdev
OR (
ref.AggregationTypeFlag = 4
AND (
STDEV(stream.Value) >= ref.MaxThreshold
OR STDEV(stream.Value) <= ref.MinThreshold
)
)
)
)
,alertsAggregatedByFunctionMergedWithReference
AS (
SELECT System.TIMESTAMP [TimeStampUtc]
,0 AS EventCount
,computed.SumValue
,computed.AvgValue
,computed.StdDevValue
,computed.SensorGuid
,computed.SensorName
,ref.MinThreshold
,ref.MaxThreshold
,ref.TimeFrameInSeconds
,ref.Count
,ref.GatewayGuid
,ref.SensorType
,ref.AggregationType
,ref.AggregationTypeFlag
,ref.EmailList
,ref.PhoneNumberList
FROM alertsAggregatedByFunction computed
JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = computed.SensorGuid
)
,allAlertsUnioned
AS (
SELECT *
FROM alertsAggregatedByFunctionMergedWithReference
UNION ALL
SELECT *
FROM alertsMimMaxComputedMergedWithReference
)
---------------alerts storage query
SELECT *
INTO [iot-predict-SA2-Alerts-ColdStorage]
FROM allAlertsUnioned
---------------alerts to alert events query
SELECT *
INTO [iot-predict-SA2-Alerts-EventStream]
FROM allAlertsUnioned
---------------alerts to stream query
SELECT *
INTO [iot-predict-SA2-TSI-EventStream]
FROM allAlertsUnioned
---------------all incoming events storage query
SELECT stream.*
INTO [iot-predict-SA2-ColdStorage]
FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
---------------all incoming events to time insights query
SELECT stream.*
INTO [iot-predict-SA2-TSI-AlertStream]
FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
由于您正在使用 "TIMESTAMP BY",流分析作业事件排序设置正在生效。请检查您作业的 "event ordering" 设置,具体如下两个:
- 迟到的事件 -- 迟到限制在 0 秒到 21 天之间。
- 处理其他事件--错误处理策略,删除或调整应用程序时间为系统时钟时间。
我猜想,您的迟到限制很可能超过 5 小时,因此可以处理那些 5 小时前的事件。
您可能已经从上面了解到流分析作业最多只能处理 "old" 延迟 21 天的事件。要解决此限制,您可以考虑以下选项之一:
- 删除 TIMESTAMP BY,然后您的所有窗口聚合都将使用排队时间。根据您的查询逻辑,这可能会生成不正确的结果。
- Select "adjust" 作为错误处理策略。同样,根据您的查询逻辑,这可能会生成不正确的结果。
- 通过使用 DATEADD() 函数将应用程序时间 (stream.UtcTime) 转移到更近的时间,例如 TIMESTAMP BY DATEADD(day, 10, UtcTime)。当这是一次性任务并且您知道事件的时间范围时,此方法很有效。
- 使用批处理作业(在流分析之外)处理 30 天前的数据。
在与 MS 的人聊天后,发现我的测试必须执行额外的步骤。
要处理延迟事件,无论延迟事件设置如何,我们都需要以某种方式启动此作业,延迟事件在作业启动时被视为已发送,因此在这种特殊情况下,我们必须启动 SA使用自定义开始日期的作业并将其设置为 30 天前。