具有字段表达式的 Apache flink keyby 函数
Apache flink keyby function with field expression
示例消息:
>{"sensor":"temp1", "value":12.0, "timestamp":19200230}
>{"sensor":"temp1", "value":12.0, "timestamp":19200230}
>{"sensor":"temp1", "value":12.0, "timestamp":19200230}
>{"sensor":"temp2", "value":5, "timestamp":19200230}
>{"sensor":"temp2", "value":5, "timestamp":19200230}
我正在尝试使用 keyby 方法构建流聚合。
DataStream<Message> messageSumStream = messageStream.keyBy("sensor").timeWindowAll(Time.minutes(5)).sum("value");
我预计
{"sensor": "temp1", "value": 36.000000, "timestamp":19200230 }
{"sensor": "temp2", "value": 10.000000, "timestamp":19200230 }
但是得到了:
{"sensor": "temp1", "value": 46.000000, "timestamp":19200230 }
我在这里错过了什么?
您正在使用来自 KeyedDataStream 的 timeWindowAll from DataStream class instead of timeWindow,生成的代码忽略了 keyBy。
试试这个:
DataStream<Message> messageSumStream = messageStream.keyBy("sensor").timeWindow(Time.minutes(5)).sum("value");
示例消息:
>{"sensor":"temp1", "value":12.0, "timestamp":19200230}
>{"sensor":"temp1", "value":12.0, "timestamp":19200230}
>{"sensor":"temp1", "value":12.0, "timestamp":19200230}
>{"sensor":"temp2", "value":5, "timestamp":19200230}
>{"sensor":"temp2", "value":5, "timestamp":19200230}
我正在尝试使用 keyby 方法构建流聚合。
DataStream<Message> messageSumStream = messageStream.keyBy("sensor").timeWindowAll(Time.minutes(5)).sum("value");
我预计
{"sensor": "temp1", "value": 36.000000, "timestamp":19200230 }
{"sensor": "temp2", "value": 10.000000, "timestamp":19200230 }
但是得到了:
{"sensor": "temp1", "value": 46.000000, "timestamp":19200230 }
我在这里错过了什么?
您正在使用来自 KeyedDataStream 的 timeWindowAll from DataStream class instead of timeWindow,生成的代码忽略了 keyBy。
试试这个:
DataStream<Message> messageSumStream = messageStream.keyBy("sensor").timeWindow(Time.minutes(5)).sum("value");