Flink 1.12 timeWindowAll/timeWindow 运算符弃用
Flink 1.12 timeWindowAll/timeWindow operators deprecation
我想更新我的 flink 应用程序 1.11.0 -> 1.13.2
由于 StreamExecutionEnvironment.setStreamTimeCharacteristic
、timeWindow()
的贬值,我的代码库停止编译
一些代码
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20,10000L))
env.getConfig.setGlobalJobParameters(parameterTool)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) #1
env.getConfig.setParallelism(parallelism)
val packetSource = env
.addSource(
new FlinkKinesisConsumer[String](s"packet-stream-name", new SimpleStringSchema, consumerConfig))
.map(json => read[MyCaseClass](json))
.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness[MyCaseClass](Duration.ofSeconds(2))
.withTimestampAssigner(new PacketWatermarkGenerator))
.timeWindowAll(Time.seconds(2)) #2
.process(new OrderPacketWindowFunction)
val heartbeatEvents = packets
.timeWindow(Time.seconds(4)) #3
.process(new HeartbeatWindowFunction)
由于我有 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
配置,看来我需要执行以下操作
- 删除冗余配置 -
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- 替换
.timeWindowAll(Time.seconds(2))
-> .windowAll(TumblingEventTimeWindows.of(Time.seconds(2)))
.timeWindow(Time.seconds(4))
-> .window(TumblingEventTimeWindows.of(Time.seconds(4)))
删除冗余配置 - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
替换.timeWindowAll(Time.seconds(2))
-> .windowAll(TumblingEventTimeWindows.of(Time.seconds(2)))
替换.timeWindow(Time.seconds(4))
-> .window(TumblingEventTimeWindows.of(Time.seconds(4)))
我想更新我的 flink 应用程序 1.11.0 -> 1.13.2
由于 StreamExecutionEnvironment.setStreamTimeCharacteristic
、timeWindow()
一些代码
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20,10000L))
env.getConfig.setGlobalJobParameters(parameterTool)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) #1
env.getConfig.setParallelism(parallelism)
val packetSource = env
.addSource(
new FlinkKinesisConsumer[String](s"packet-stream-name", new SimpleStringSchema, consumerConfig))
.map(json => read[MyCaseClass](json))
.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness[MyCaseClass](Duration.ofSeconds(2))
.withTimestampAssigner(new PacketWatermarkGenerator))
.timeWindowAll(Time.seconds(2)) #2
.process(new OrderPacketWindowFunction)
val heartbeatEvents = packets
.timeWindow(Time.seconds(4)) #3
.process(new HeartbeatWindowFunction)
由于我有 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
配置,看来我需要执行以下操作
- 删除冗余配置 -
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- 替换
.timeWindowAll(Time.seconds(2))
->.windowAll(TumblingEventTimeWindows.of(Time.seconds(2)))
.timeWindow(Time.seconds(4))
->.window(TumblingEventTimeWindows.of(Time.seconds(4)))
删除冗余配置 -
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
替换
.timeWindowAll(Time.seconds(2))
->.windowAll(TumblingEventTimeWindows.of(Time.seconds(2)))
替换
.timeWindow(Time.seconds(4))
->.window(TumblingEventTimeWindows.of(Time.seconds(4)))