Flink window 基于事件时间的操作在 watermark 小于 window 结束时触发

Flink window operation based on event time is triggered when watermark is less than the end of window ends

我正在 Flink 中测试事件时间和水印。下面是我的代码。

object WatermarkTest {

     def main(args: Array[String]): Unit = {
         val env = StreamExecutionEnvironment.getExecutionEnvironment
         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

         val properties = new Properties()
         properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
         properties.setProperty("group.id", "enven-test")

         env.getConfig.setAutoWatermarkInterval(1L)
         val input = env.addSource(new FlinkKafkaConsumer011[String]("event-time-topic", new SimpleStringSchema(), properties))

         val inputMap = input.map(f=> {
           val arr = f.split(",")
           val code = arr(0)
           val time = arr(1).toLong
           MyEvent(code, time)
        })

        val watermark = inputMap.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator())
        val window = watermark
           .keyBy(_.code)
           .window(TumblingEventTimeWindows.of(Time.seconds(5)))
           .apply(new WindowFunctionTest)

        window.print()

       env.execute()
    }

   class WindowFunctionTest extends WindowFunction[MyEvent,(String, Int,String,String,String,String),String,TimeWindow]{
       override def apply(key: String, window: TimeWindow, input: Iterable[MyEvent], out: Collector[(String, Int,String,String,String,String)]): Unit = {
       val list = input.toList.sortBy(_.time)
       val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
 out.collect(key,input.size,format.format(list.head.time),format.format(list.last.time),format.format(window.getStart),format.format(window.getEnd))
    }
  }
}

下面是事件时间和水印生成器:

class BoundedOutOfOrdernessGenerator extends 
      AssignerWithPeriodicWatermarks[MyEvent] {
      val maxOutOfOrderness = 10000L

      var currentMaxTimestamp: Long = _

      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

      var watermark: Watermark = null
      var timestamp: Long = _

      override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
           timestamp = element.time
           currentMaxTimestamp = if (timestamp > currentMaxTimestamp) timestamp else currentMaxTimestamp
           println("timestamp:" + element.code +","+ element.time + "|" +format.format(element.time) +", currentMaxTimestamp: "+  currentMaxTimestamp + "|"+ format.format(currentMaxTimestamp) + ", watermark: "+ format.format(watermark.getTimestamp))
           timestamp;
      }

      override def getCurrentWatermark(): Watermark = {
           watermark = new Watermark((currentMaxTimestamp - maxOutOfOrderness)/1000*1000);
          watermark
      }
 }

这里是一些测试数据。 个人认为,第一次计算应该在watermark之后:2016-04-27 19:34:25.000。并且测试结果显示watermark之后触发的计算watermark: 2016-04-27 19:34:24.000。 有人可以解释一下吗?

我建议您在 getCurrentWatermark 和 extractTimestamp 中打印水印。那应该澄清发生了什么。

问题是正在调用 extractTimestamp 以从时间戳为 19:34:35 的事件中提取时间戳 -- 该事件将导致当前水印前进到 19:34:25,从而触发window -- 此时您正在打印当前水印。在 extractTimestamp 中的 println 被执行时,水印还没有被推进以反映这个新事件。但在 extractTimestamp returns 之后不久,将调用 getCurrentWatermark,这会将当前水印提前到 19:34:25,这将依次触发 window.