不触发具有事件时间特性的 TumblingProcessingTimeWindows 处理
TumblingProcessingTimeWindows processing with event time characteristic is not triggered
我的用例非常简单,我收到包含 "event timestamp" 的事件,并希望根据事件时间聚合它们。并且输出是一个周期性的处理时间,每 10 分钟翻滚 window。
更具体地说,需要计算 7 秒的键控数据流。
- 翻滚 window 1 秒
- a 滑动 window 计算 7 秒,提前 1 秒
- a windowall 每 1s 输出所有计数
我无法对其进行集成测试(即类似于单元测试但端到端测试),因为输入具有假事件时间,不会触发
这是我的片段
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val oneDayCounts = data
.map(t => (t.key1, t.key2, 1L, t.timestampMs))
.keyBy(0, 1)
.timeWindow(Time.seconds(1))
.sum(2)
val sevenDayCounts = oneDayCounts
.keyBy(0,1)
.timeWindow(Time.seconds(3), Time.seconds(1))
.sum(2)
// single reducer
sevenDayCounts
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.process(...)
我使用 EventTime 作为时间戳,并使用 MiniClusterWithClientResource
设置了集成测试代码。还创建了一些带有事件时间戳的假数据,如 1234l、4567l 等。
可以触发 EventTimeTrigger 进行求和计算,但无法触发以下 TumblingProcessingTimeWindow。我在 IT 测试代码中有一个 Thread.sleep 的 30s 但在 30s
之后仍然没有触发
一般来说,为处理时间编写有意义的测试是一项挑战 windows,因为它们本质上是不确定的。这就是事件时间 windows 通常更受欢迎的原因之一。
在正确的地方睡觉也很难达到预期的效果。但是,使作业 运行 足够长以触发处理时间 window 的一种方法是使用包含睡眠的自定义源。一旦输入耗尽,具有有限源的 Flink 流作业就会自行关闭。最后一个值为 MAX_WATERMARK 的水印通过管道发送,这会触发所有事件时间 windows,但处理时间 windows 仅当它们仍然 运行 时才会触发约定时间到了
请参阅 了解解决此问题的黑客示例。
或者,您可以查看 https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java 以了解如何通过模拟 getCurrentProcessingTime
来测试处理时间 windows。
我的用例非常简单,我收到包含 "event timestamp" 的事件,并希望根据事件时间聚合它们。并且输出是一个周期性的处理时间,每 10 分钟翻滚 window。
更具体地说,需要计算 7 秒的键控数据流。
- 翻滚 window 1 秒
- a 滑动 window 计算 7 秒,提前 1 秒
- a windowall 每 1s 输出所有计数
我无法对其进行集成测试(即类似于单元测试但端到端测试),因为输入具有假事件时间,不会触发
这是我的片段
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val oneDayCounts = data
.map(t => (t.key1, t.key2, 1L, t.timestampMs))
.keyBy(0, 1)
.timeWindow(Time.seconds(1))
.sum(2)
val sevenDayCounts = oneDayCounts
.keyBy(0,1)
.timeWindow(Time.seconds(3), Time.seconds(1))
.sum(2)
// single reducer
sevenDayCounts
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.process(...)
我使用 EventTime 作为时间戳,并使用 MiniClusterWithClientResource
设置了集成测试代码。还创建了一些带有事件时间戳的假数据,如 1234l、4567l 等。
可以触发 EventTimeTrigger 进行求和计算,但无法触发以下 TumblingProcessingTimeWindow。我在 IT 测试代码中有一个 Thread.sleep 的 30s 但在 30s
之后仍然没有触发一般来说,为处理时间编写有意义的测试是一项挑战 windows,因为它们本质上是不确定的。这就是事件时间 windows 通常更受欢迎的原因之一。
在正确的地方睡觉也很难达到预期的效果。但是,使作业 运行 足够长以触发处理时间 window 的一种方法是使用包含睡眠的自定义源。一旦输入耗尽,具有有限源的 Flink 流作业就会自行关闭。最后一个值为 MAX_WATERMARK 的水印通过管道发送,这会触发所有事件时间 windows,但处理时间 windows 仅当它们仍然 运行 时才会触发约定时间到了
请参阅
或者,您可以查看 https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java 以了解如何通过模拟 getCurrentProcessingTime
来测试处理时间 windows。