Azure 流分析 - 应用 window 翻滚时自定义 "timestamp by" 出错

Azure Stream Analytics - Error with customized "timestamp by" while applying window tumbling

我有一个 json 文件如下:

{"imei": {"imei": "358174069248418F", "imeiBinary": "NYF0BpJIQY8=","imeiNotEncoded": "358174069248418","valid": 1},"dataPackets": [["msy.mxp.datapacket.AlarmNotification",{"version": 1, "id": 21, "op": 2,"sizeDynamic": 0, "alarmStatus": 4}],["msy.mxp.datapacket.IOStatus",{"version": 1,"id": 15, "op": 2,"sizeDynamic": 0,"ioStatus": 135,"ioDirections": 120}], ["msy.mxp.datapacket.LogicalStatus",{"version": 1,"id": 16, "op": 2,"sizeDynamic": 0,"logicalStatus": 5} ],[ "msy.mxp.datapacket.Position", {"version": 1,"id": 19,"op": 2,"latitude": 40.835243,"longitude": 14.246057,"altitude": 40,"speed": 0, "course": 68, "gpsNumSatellite": 5,"glonassNumSatellite": 1,"fixValid": 1,"timeValid": 1,"wgs84degMinFormat": 1, "glonass": 1,"fixMode": 3,"timestamp": {"timeSecFrom1Gen2000": 925560202,"time": 1490648755000 }, "sizeDynamic": 0} ] ]}

我正在阅读以下查询:

WITH Datapackets AS
(
SELECT imei.imei as imei,
        persistent as persistent,
        [timestamp].[time] as input_time,
        compressed as compressed,
        GetArrayElement(dataPackets, 3) as position
FROM h24
), one as(
SELECT *,
GetRecordPropertyValue (GetArrayElement(position,1), 'timestamp') as position_timestamp --1st
from Datapackets 
), two as (
select
    imei,
    GetRecordPropertyValue (GetArrayElement(position,1), 'op') as position_OP,
    [position_timestamp].[time] as position_time,
    dateadd(S, [position_timestamp].[timeSecFrom1Gen2000], '1970-01-01') as timing,
GetRecordPropertyValue (GetArrayElement(position,1), 'latitude') as position_latitude,
GetRecordPropertyValue (GetArrayElement(position,1), 'longitude') as position_longitude,
GetRecordPropertyValue (GetArrayElement(position,1), 'altitude') as position_altitude,
GetRecordPropertyValue (GetArrayElement(position,1), 'speed') as position_speed
    from one) SELECT * from two

现在我想按如下所示将 window 翻滚组设置为 30 秒,但我遇到一个问题,它告诉我输入文件 "two" 不允许使用时间戳 属性 , ,这里是我使用的查询

    WITH Datapackets AS
    (
    SELECT imei.imei as imei,
            persistent as persistent,
            [timestamp].[time] as input_time,
            compressed as compressed,
            GetArrayElement(dataPackets, 3) as position
    FROM h24
    ), one as(
    SELECT *,
    GetRecordPropertyValue (GetArrayElement(position,1), 'timestamp') as position_timestamp --1st
    from Datapackets 
    ), two as (
    select
        imei,
        GetRecordPropertyValue (GetArrayElement(position,1), 'op') as position_OP,
        [position_timestamp].[time] as position_time,
        dateadd(S, [position_timestamp].[timeSecFrom1Gen2000], '1970-01-01') as timing,
GetRecordPropertyValue (GetArrayElement(position,1), 'latitude') as position_latitude,
GetRecordPropertyValue (GetArrayElement(position,1), 'longitude') as position_longitude,
GetRecordPropertyValue (GetArrayElement(position,1), 'altitude') as position_altitude,
GetRecordPropertyValue (GetArrayElement(position,1), 'speed') as position_speed
        from one) SELECT imei, System.TimeStamp AS 'start', Avg(position_speed), max(position_latitude)  
FROM two TIMESTAMP BY TIMING GROUP BY imei, TumblingWindow(duration(second, 30))

错误出现在最后两行(FROM two TIMESTAMP BY TIMING),

************** 更新, 查多了发现只能在input中使用timestamp by这个选项将为事件制作自定义时间戳。通常它们以到达时间作为默认时间戳 (https://msdn.microsoft.com/en-us/library/mt573293.aspx)

现在我的问题是如何使用记录在 Json 文件的第 3 级数组中的时间字段为我的事件添加时间戳,以便能够进行聚合。

关于我如何处理这个问题的任何建议,谢谢

根据 https://msdn.microsoft.com/en-us/library/mt598501.aspx TIMESTAMP BY 只能用于输入源,因此您可能希望将其作为第一步的一部分:

WITH Datapackets AS
...
FROM h24 TIMESTAMP BY (expression)
...

此外,来自同一来源,指的是 System.TimeStampGROUP BY 在 window 上使用时:

The timestamp of the result of the aggregate is the end of the time window to which this result corresponds.

所以当你在最后的SELECT语句中写System.TimeStamp时,指的是当前window.

的结尾