Flink watermark:通过实现 WatermarkGenerator 发送水印的频率
Flink watermark: the frequency of sending watermark by implementing the WatermarkGenerator
我在官方指导中看到了下面的例子
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
private final long maxOutOfOrderness = 3500;
private long currentMaxTimestamp;
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
}
允许的最大延迟为 3.5 秒。这是否意味着这将每 3.5 秒发送一个水印?如果不是,我不明白 onEvent
实现中时间戳比较的意义是什么。
BoundedOutOfOrdernessGenerator
的 onEvent
方法正在跟踪迄今为止在流中看到的最大时间戳。基于流最多 maxOutOfOrderness
乱序的假设,在看到值为 currentMaxTimestamp
的时间戳后,我们假设时间戳小于 currentMaxTimestamp - maxOutOfOrderness - 1
的所有事件都应该具有已经到了。
onPeriodicEmit
每 autoWatermarkInterval
毫秒调用一次,默认情况下为 200 毫秒。
Math.max
确保 currentMaxTimestamp
不会倒退。
我在官方指导中看到了下面的例子
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
private final long maxOutOfOrderness = 3500;
private long currentMaxTimestamp;
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
}
允许的最大延迟为 3.5 秒。这是否意味着这将每 3.5 秒发送一个水印?如果不是,我不明白 onEvent
实现中时间戳比较的意义是什么。
BoundedOutOfOrdernessGenerator
的 onEvent
方法正在跟踪迄今为止在流中看到的最大时间戳。基于流最多 maxOutOfOrderness
乱序的假设,在看到值为 currentMaxTimestamp
的时间戳后,我们假设时间戳小于 currentMaxTimestamp - maxOutOfOrderness - 1
的所有事件都应该具有已经到了。
onPeriodicEmit
每 autoWatermarkInterval
毫秒调用一次,默认情况下为 200 毫秒。
Math.max
确保 currentMaxTimestamp
不会倒退。