Flink 1.12 timeWindowAll/timeWindow 运算符弃用

Flink 1.12 timeWindowAll/timeWindow operators deprecation

我想更新我的 flink 应用程序 1.11.0 -> 1.13.2 由于 StreamExecutionEnvironment.setStreamTimeCharacteristictimeWindow()

的贬值,我的代码库停止编译

一些代码

    val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20,10000L))
      env.getConfig.setGlobalJobParameters(parameterTool)
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) #1
      env.getConfig.setParallelism(parallelism)

    val packetSource = env
      .addSource(
        new FlinkKinesisConsumer[String](s"packet-stream-name", new SimpleStringSchema, consumerConfig))
      .map(json => read[MyCaseClass](json))
      .assignTimestampsAndWatermarks(WatermarkStrategy
        .forBoundedOutOfOrderness[MyCaseClass](Duration.ofSeconds(2))
        .withTimestampAssigner(new PacketWatermarkGenerator))
      .timeWindowAll(Time.seconds(2))  #2
      .process(new OrderPacketWindowFunction)

    val heartbeatEvents = packets
      .timeWindow(Time.seconds(4)) #3
      .process(new HeartbeatWindowFunction)

由于我有 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 配置,看来我需要执行以下操作

  1. 删除冗余配置 - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  2. 替换.timeWindowAll(Time.seconds(2)) -> .windowAll(TumblingEventTimeWindows.of(Time.seconds(2)))
  3. .timeWindow(Time.seconds(4)) -> .window(TumblingEventTimeWindows.of(Time.seconds(4)))
  1. 删除冗余配置 - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  2. 替换.timeWindowAll(Time.seconds(2)) -> .windowAll(TumblingEventTimeWindows.of(Time.seconds(2)))

  3. 替换.timeWindow(Time.seconds(4)) -> .window(TumblingEventTimeWindows.of(Time.seconds(4)))