SA延迟事件的过程

process of late events in SA

我正在做一个测试,当时我生成了 30 天前的数据。 当发送到 SA 作业时,所有输入都被丢弃,但根据 事件排序 blade 中的设置,我期望所有输入都将通过。

部分职位查询包含:

---------------all incoming events storage query
SELECT stream.*
INTO [iot-predict-SA2-ColdStorage]
FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime

所以我的期望是将推送到 SA 作业的所有内容都存储在 blob 存储中。

当我发送仅 5 小时前的事件时 - 输入被标记为延迟(预期)并已处理。 每个 SS 第一个标记区域显示过时的事件输入,但没有输出(红色),第二部分显示延迟处理的事件。

完整查询

    WITH AlertsBasedOnMin
    AS (
        SELECT stream.SensorGuid
            ,stream.Value
            ,stream.SensorName
            ,ref.AggregationTypeFlag
            ,ref.MinThreshold AS threshold
            ,ref.Count
            ,CASE 
                WHEN (ref.MinThreshold > stream.Value)
                    THEN 1
                ELSE 0
                END AS isAlert
        FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
        JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = stream.SensorGuid
        WHERE ref.AggregationTypeFlag = 8
        )
        ,AlertsBasedOnMax
    AS (
        SELECT stream.SensorGuid
            ,stream.Value
            ,stream.SensorName
            ,ref.AggregationTypeFlag
            ,ref.MaxThreshold AS threshold
            ,ref.Count
            ,CASE 
                WHEN (ref.MaxThreshold < stream.Value)
                    THEN 1
                ELSE 0
                END AS isAlert
        FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
        JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = stream.SensorGuid
        WHERE ref.AggregationTypeFlag = 16
        )
        ,alertMinMaxUnion
    AS (
        SELECT *
        FROM AlertsBasedOnMin

        UNION ALL

        SELECT *
        FROM AlertsBasedOnMax
        )
        ,alertMimMaxComputed
    AS (
        SELECT SUM(alertMinMaxUnion.isAlert) AS EventCount
            ,alertMinMaxUnion.SensorGuid AS SensorGuid
            ,alertMinMaxUnion.SensorName
        FROM alertMinMaxUnion
        GROUP BY HoppingWindow(Duration(minute, 1), Hop(second, 30))
            ,alertMinMaxUnion.SensorGuid
            ,alertMinMaxUnion.Count
            ,alertMinMaxUnion.AggregationTypeFlag
            ,alertMinMaxUnion.SensorName
        HAVING SUM(alertMinMaxUnion.isAlert) > alertMinMaxUnion.Count
        )
        ,alertsMimMaxComputedMergedWithReference
    AS (
        SELECT System.TIMESTAMP [TimeStampUtc]
            ,computed.EventCount
            ,0 AS SumValue
            ,0 AS AvgValue
            ,0 AS StdDevValue
            ,computed.SensorGuid
            ,computed.SensorName
            ,ref.MinThreshold
            ,ref.MaxThreshold
            ,ref.TimeFrameInSeconds
            ,ref.Count
            ,ref.GatewayGuid
            ,ref.SensorType
            ,ref.AggregationType
            ,ref.AggregationTypeFlag
            ,ref.EmailList
            ,ref.PhoneNumberList
        FROM alertMimMaxComputed computed
        JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = computed.SensorGuid
        )
        ,alertsAggregatedByFunction
    AS (
        SELECT Count(1) AS eventCount
            ,stream.SensorGuid AS SensorGuid
            ,stream.SensorName
            ,ref.[Count] AS TriggerThreshold
            ,SUM(stream.Value) AS SumValue
            ,AVG(stream.Value) AS AvgValue
            ,STDEV(stream.Value) AS StdDevValue
            ,ref.AggregationTypeFlag AS flag
        FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
        JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = stream.SensorGuid
        GROUP BY HoppingWindow(Duration(minute, 1), Hop(second, 30))
            ,ref.AggregationTypeFlag
            ,ref.[Count]
            ,ref.MaxThreshold
            ,ref.MinThreshold
            ,stream.SensorGuid
            ,stream.SensorName
        HAVING
            --as this is alert then this factor will be relevant to all of the aggregated queries
            Count(1) >= ref.[Count]
            AND (
                --average
                (
                    ref.AggregationTypeFlag = 1
                    AND (
                        AVG(stream.Value) >= ref.MaxThreshold
                        OR AVG(stream.Value) <= ref.MinThreshold
                        )
                    )
                --sum
                OR (
                    ref.AggregationTypeFlag = 2
                    AND (
                        SUM(stream.Value) >= ref.MaxThreshold
                        OR Sum(stream.Value) <= ref.MinThreshold
                        )
                    )
                --stdev 
                OR (
                    ref.AggregationTypeFlag = 4
                    AND (
                        STDEV(stream.Value) >= ref.MaxThreshold
                        OR STDEV(stream.Value) <= ref.MinThreshold
                        )
                    )
                )
        )
        ,alertsAggregatedByFunctionMergedWithReference
    AS (
        SELECT System.TIMESTAMP [TimeStampUtc]
            ,0 AS EventCount
            ,computed.SumValue
            ,computed.AvgValue
            ,computed.StdDevValue
            ,computed.SensorGuid
            ,computed.SensorName
            ,ref.MinThreshold
            ,ref.MaxThreshold
            ,ref.TimeFrameInSeconds
            ,ref.Count
            ,ref.GatewayGuid
            ,ref.SensorType
            ,ref.AggregationType
            ,ref.AggregationTypeFlag
            ,ref.EmailList
            ,ref.PhoneNumberList
        FROM alertsAggregatedByFunction computed
        JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = computed.SensorGuid
        )
        ,allAlertsUnioned
    AS (
        SELECT *
        FROM alertsAggregatedByFunctionMergedWithReference

        UNION ALL

        SELECT *
        FROM alertsMimMaxComputedMergedWithReference
        )


    ---------------alerts storage query 
    SELECT *
    INTO [iot-predict-SA2-Alerts-ColdStorage]
    FROM allAlertsUnioned

    ---------------alerts to alert events query
    SELECT *
    INTO [iot-predict-SA2-Alerts-EventStream]
    FROM allAlertsUnioned


    ---------------alerts to stream query
    SELECT *
    INTO [iot-predict-SA2-TSI-EventStream]
    FROM allAlertsUnioned


    ---------------all incoming events storage query
    SELECT stream.*
    INTO [iot-predict-SA2-ColdStorage]
    FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime


    ---------------all incoming events to time insights query
    SELECT stream.*
    INTO [iot-predict-SA2-TSI-AlertStream]
    FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime

由于您正在使用 "TIMESTAMP BY",流分析作业事件排序设置正在生效。请检查您作业的 "event ordering" 设置,具体如下两个:

  • 迟到的事件 -- 迟到限制在 0 秒到 21 天之间。
  • 处理其他事件--错误处理策略,删除或调整应用程序时间为系统时钟时间。

我猜想,您的迟到限制很可能超过 5 小时,因此可以处理那些 5 小时前的事件。

您可能已经从上面了解到流分析作业最多只能处理 "old" 延迟 21 天的事件。要解决此限制,您可以考虑以下选项之一:

  1. 删除 TIMESTAMP BY,然后您的所有窗口聚合都将使用排队时间。根据您的查询逻辑,这可能会生成不正确的结果。
  2. Select "adjust" 作为错误处理策略。同样,根据您的查询逻辑,这可能会生成不正确的结果。
  3. 通过使用 DATEADD() 函数将应用程序时间 (stream.UtcTime) 转移到更近的时间,例如 TIMESTAMP BY DATEADD(day, 10, UtcTime)。当这是一次性任务并且您知道事件的时间范围时,此方法很有效。
  4. 使用批处理作业(在流分析之外)处理 30 天前的数据。

在与 MS 的人聊天后,发现我的测试必须执行额外的步骤。

要处理延迟事件,无论延迟事件设置如何,我们都需要以某种方式启动此作业,延迟事件在作业启动时被视为已发送,因此在这种特殊情况下,我们必须启动 SA使用自定义开始日期的作业并将其设置为 30 天前。