Apache Flink - 如何结合 AssignerWithPeriodicWatermark 和 AssignerWithPunctuatedWatermark?
Apache Flink - How to Combine AssignerWithPeriodicWatermark and AssignerWithPunctuatedWatermark?
用例: 使用 EventTime 并从 Kafka 的记录中提取时间戳。
myConsumer.assignTimestampsAndWatermarks(new MyTimestampEmitter());
...
stream
.keyBy("platform")
.window(TumblingEventTimeWindows 5 mins))
.aggregate(AggFunc(), WindowFunc())
.countWindowAll(size)
.apply(someFunc)
.addSink(someSink);
我想要的:Flink 提取时间戳并为每条记录发出初始间隔(例如 20 秒)的水印,然后它可以定期发出水印(例如每 10 秒)。
原因:如果我使用PeriodicWatermark,一开始Flink只会在一段时间后发出水印,并且我的第一个window 的 5 分钟是错误的 - 比随后的 windows 中的计数大得多。我有一个解决方法,将 setAutoWatermarkInterval 设置为 100 毫秒,但这是不必要的。
目前,我必须使用 AssignerWithPeriodicWatermark 或 AssignerWithPunctuatedWatermark。我如何实施这种组合策略的方法?谢谢
在对你的水印生成器做一些不寻常的事情之前,我会仔细检查你是否正确诊断了这种情况。总的来说,事件时间 windows 应该具有确定性,并且如果呈现相同的输入,则始终产生相同的结果。如果您得到的第一个 window 的结果根据生成水印的频率而有所不同,则表明您可能有延迟事件,当水印更频繁地到达时,这些事件会被丢弃,并且能够包含在水印不太频繁。也许您的水印没有正确说明您的事件所经历的实际无序程度?或者您的水印可能基于 System.currentTimeMillis(),而不是事件时间戳?
此外,第一次 window 与其他时间不同是正常的,因为时间 windows 与纪元对齐,而不是与第一个事件对齐。当然,这会导致第一个 window 涵盖的时间比其他所有时间都短,因此您应该期望它包含的事件更少,而不是更多。
将 setAutoWatermarkInterval 设置为 100 毫秒是非常正常的事情。但是如果你真的想避免这种情况,你可以考虑一个 AssignerWithPunctuatedWatermarks,它最初 returns 每个事件的水印,然后在适当的间隔之后,returns 水印不那么频繁。
在标点水印分配器中,每个事件都会调用 extractTimestamp 和 checkAndGetNextWatermark 方法。您可以在分配器中使用一些瞬态(非 flink)状态来跟踪第一个事件的时间,或对事件进行计数,并在 checkAndGetNextWatermark 中使用该信息最终退出并停止为每个事件生成水印(通过有时从 checkAndGetNextWatermark 返回 null,而不是 Watermark)。无论何时重新启动,您的应用程序都将始终恢复为每个事件生成水印。
这不会产生具有周期性和标点分配器所有特征的分配器,它只是一个自适应标点分配器。
用例: 使用 EventTime 并从 Kafka 的记录中提取时间戳。
myConsumer.assignTimestampsAndWatermarks(new MyTimestampEmitter());
...
stream
.keyBy("platform")
.window(TumblingEventTimeWindows 5 mins))
.aggregate(AggFunc(), WindowFunc())
.countWindowAll(size)
.apply(someFunc)
.addSink(someSink);
我想要的:Flink 提取时间戳并为每条记录发出初始间隔(例如 20 秒)的水印,然后它可以定期发出水印(例如每 10 秒)。
原因:如果我使用PeriodicWatermark,一开始Flink只会在一段时间后发出水印,并且我的第一个window 的 5 分钟是错误的 - 比随后的 windows 中的计数大得多。我有一个解决方法,将 setAutoWatermarkInterval 设置为 100 毫秒,但这是不必要的。
目前,我必须使用 AssignerWithPeriodicWatermark 或 AssignerWithPunctuatedWatermark。我如何实施这种组合策略的方法?谢谢
在对你的水印生成器做一些不寻常的事情之前,我会仔细检查你是否正确诊断了这种情况。总的来说,事件时间 windows 应该具有确定性,并且如果呈现相同的输入,则始终产生相同的结果。如果您得到的第一个 window 的结果根据生成水印的频率而有所不同,则表明您可能有延迟事件,当水印更频繁地到达时,这些事件会被丢弃,并且能够包含在水印不太频繁。也许您的水印没有正确说明您的事件所经历的实际无序程度?或者您的水印可能基于 System.currentTimeMillis(),而不是事件时间戳?
此外,第一次 window 与其他时间不同是正常的,因为时间 windows 与纪元对齐,而不是与第一个事件对齐。当然,这会导致第一个 window 涵盖的时间比其他所有时间都短,因此您应该期望它包含的事件更少,而不是更多。
将 setAutoWatermarkInterval 设置为 100 毫秒是非常正常的事情。但是如果你真的想避免这种情况,你可以考虑一个 AssignerWithPunctuatedWatermarks,它最初 returns 每个事件的水印,然后在适当的间隔之后,returns 水印不那么频繁。
在标点水印分配器中,每个事件都会调用 extractTimestamp 和 checkAndGetNextWatermark 方法。您可以在分配器中使用一些瞬态(非 flink)状态来跟踪第一个事件的时间,或对事件进行计数,并在 checkAndGetNextWatermark 中使用该信息最终退出并停止为每个事件生成水印(通过有时从 checkAndGetNextWatermark 返回 null,而不是 Watermark)。无论何时重新启动,您的应用程序都将始终恢复为每个事件生成水印。
这不会产生具有周期性和标点分配器所有特征的分配器,它只是一个自适应标点分配器。