不触发具有事件时间特性的 TumblingProcessingTimeWindows 处理

TumblingProcessingTimeWindows processing with event time characteristic is not triggered

我的用例非常简单,我收到包含 "event timestamp" 的事件,并希望根据事件时间聚合它们。并且输出是一个周期性的处理时间,每 10 分钟翻滚 window。

更具体地说,需要计算 7 秒的键控数据流。

  1. 翻滚 window 1 秒
  2. a 滑动 window 计算 7 秒,提前 1 秒
  3. a windowall 每 1s 输出所有计数

我无法对其进行集成测试(即类似于单元测试但端到端测试),因为输入具有假事件时间,不会触发

这是我的片段

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val oneDayCounts = data
  .map(t => (t.key1, t.key2, 1L, t.timestampMs))
  .keyBy(0, 1)
  .timeWindow(Time.seconds(1))
  .sum(2)

val sevenDayCounts = oneDayCounts
  .keyBy(0,1)
  .timeWindow(Time.seconds(3), Time.seconds(1))
  .sum(2)

// single reducer
sevenDayCounts
  .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
  .process(...)

我使用 EventTime 作为时间戳,并使用 MiniClusterWithClientResource 设置了集成测试代码。还创建了一些带有事件时间戳的假数据,如 1234l、4567l 等。

可以触发 EventTimeTrigger 进行求和计算,但无法触发以下 TumblingProcessingTimeWindow。我在 IT 测试代码中有一个 Thread.sleep 的 30s 但在 30s

之后仍然没有触发

一般来说,为处理时间编写有意义的测试是一项挑战 windows,因为它们本质上是不确定的。这就是事件时间 windows 通常更受欢迎的原因之一。

在正确的地方睡觉也很难达到预期的效果。但是,使作业 运行 足够长以触发处理时间 window 的一种方法是使用包含睡眠的自定义源。一旦输入耗尽,具有有限源的 Flink 流作业就会自行关闭。最后一个值为 MAX_WATERMARK 的水印通过管道发送,这会触发所有事件时间 windows,但处理时间 windows 仅当它们仍然 运行 时才会触发约定时间到了

请参阅 了解解决此问题的黑客示例。

或者,您可以查看 https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java 以了解如何通过模拟 getCurrentProcessingTime 来测试处理时间 windows。