Flink 会话缺少延迟输出 Window
Late outputs missing for Flink's Session Window
在我的管道设置中,我看不到会话 Window 的侧输出。我正在使用 Flink 1.9.1
版本 1。
我有的是:
messageStream.
.keyBy(tradeKeySelector)
.window(ProcessingTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
.sideOutputLateData(lateTradeMessages)
.process(new CumulativeTransactionOperator())
.name("Aggregate Transaction Builder");
lateTradeMessages 实现 SessionWindowTimeGapExtractor 和 returns 5 秒。
我还有这个:
messageStream.getSideOutput(lateTradeMessages)
.keyBy(tradeKeySelector)
.process(new KeyedProcessFunction<Long, EnrichedMessage, Transaction>() {
@Override
public void processElement(EnrichedMessage value, Context ctx, Collector<Transaction> out) throws Exception {
System.out.println("Process Late messages For Aggregation");
out.collect(new Transaction());
}
})
.name("Process Late messages For Aggregation");
问题是,当我使用相同的密钥发送消息时,我从来没有看到 "Process Late messages For Aggregation" 应该错过 window 时间。
当 Session Window 通过并且我 "immediately" 为同一密钥发送了一条新消息时,它会触发新 Session Window 而不会进入 Late SideOutput。
不确定我做错了什么。
我想在这里实现的是抓住 "late events" 并尝试
重新处理它们。
如有任何帮助,我将不胜感激。
版本 2,@Dominik Wosiński 评论后:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
env.setParallelism(1);
env.disableOperatorChaining();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
DataStream<RawMessage> rawBusinessTransaction = env
.addSource(new FlinkKafkaConsumer<>("business",
new JSONKeyValueDeserializationSchema(false), properties))
.map(new KafkaTransactionObjectMapOperator())
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<RawMessage>() {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis());
}
@Override
public long extractTimestamp(RawMessage element, long previousElementTimestamp) {
return element.messageCreationTime;
}
})
.name("Kafka Transaction Raw Data Source.");
messageStream
.keyBy(tradeKeySelector)
.window(EventTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
.sideOutputLateData(lateTradeMessages)
.process(new CumulativeTransactionOperator())
.name("Aggregate Transaction Builder");
Watermarks 正在处理中,我查看了 Flink 的 Metrics。 Window 运算符正在执行,但仍然没有延迟输出。
顺便说一句,Kafka 主题可能是空闲的,所以我必须定期发出新的 WaterMarks。
在你的情况下你使用的是ProcessingTime
,这意味着系统时间用于测量DataStream
中的时间流量。
对于每个事件,分配给该事件的时间戳是您在 Flink 管道中收到数据的时刻。这意味着 Flink 处理时间无法让事件乱序。因此,您的 windows 永远不会有迟到的元素。
如果您切换到 EventTime
,那么对于正确的输入数据,您应该能够看到延迟的元素被传递到侧输出。
你可能应该看看 documentation,那里解释了 Flink 中的各种时间概念。
我觉得水印方法很可疑。通常,您会在此时输出最新的事件时间戳。
只是一些背景信息,这样更容易理解。
迟到事件是指在水印处理之后到事件之后的某个时间发生的事件。考虑以下示例:
event1 @time 1
event2 @time 2
watermark1 @time 3
event3 @time 1 <-- late event
event4 @time 4
您的水印方法几乎会将所有过去的事件呈现为迟到的事件(由于 1s 水印间隔,有点容忍)。这也会使再加工和追赶变得不可能。
但是,您实际上没有看到任何延迟事件,这让我更加惊讶。您能否仔细检查您的水印方法、描述您的用例并提供示例数据?很多时候,实现对于实际用例来说并不理想,应该以不同的方式解决。
在我的管道设置中,我看不到会话 Window 的侧输出。我正在使用 Flink 1.9.1
版本 1。 我有的是:
messageStream.
.keyBy(tradeKeySelector)
.window(ProcessingTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
.sideOutputLateData(lateTradeMessages)
.process(new CumulativeTransactionOperator())
.name("Aggregate Transaction Builder");
lateTradeMessages 实现 SessionWindowTimeGapExtractor 和 returns 5 秒。
我还有这个:
messageStream.getSideOutput(lateTradeMessages)
.keyBy(tradeKeySelector)
.process(new KeyedProcessFunction<Long, EnrichedMessage, Transaction>() {
@Override
public void processElement(EnrichedMessage value, Context ctx, Collector<Transaction> out) throws Exception {
System.out.println("Process Late messages For Aggregation");
out.collect(new Transaction());
}
})
.name("Process Late messages For Aggregation");
问题是,当我使用相同的密钥发送消息时,我从来没有看到 "Process Late messages For Aggregation" 应该错过 window 时间。
当 Session Window 通过并且我 "immediately" 为同一密钥发送了一条新消息时,它会触发新 Session Window 而不会进入 Late SideOutput。
不确定我做错了什么。
我想在这里实现的是抓住 "late events" 并尝试 重新处理它们。
如有任何帮助,我将不胜感激。
版本 2,@Dominik Wosiński 评论后:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
env.setParallelism(1);
env.disableOperatorChaining();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
DataStream<RawMessage> rawBusinessTransaction = env
.addSource(new FlinkKafkaConsumer<>("business",
new JSONKeyValueDeserializationSchema(false), properties))
.map(new KafkaTransactionObjectMapOperator())
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<RawMessage>() {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis());
}
@Override
public long extractTimestamp(RawMessage element, long previousElementTimestamp) {
return element.messageCreationTime;
}
})
.name("Kafka Transaction Raw Data Source.");
messageStream
.keyBy(tradeKeySelector)
.window(EventTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
.sideOutputLateData(lateTradeMessages)
.process(new CumulativeTransactionOperator())
.name("Aggregate Transaction Builder");
Watermarks 正在处理中,我查看了 Flink 的 Metrics。 Window 运算符正在执行,但仍然没有延迟输出。
顺便说一句,Kafka 主题可能是空闲的,所以我必须定期发出新的 WaterMarks。
在你的情况下你使用的是ProcessingTime
,这意味着系统时间用于测量DataStream
中的时间流量。
对于每个事件,分配给该事件的时间戳是您在 Flink 管道中收到数据的时刻。这意味着 Flink 处理时间无法让事件乱序。因此,您的 windows 永远不会有迟到的元素。
如果您切换到 EventTime
,那么对于正确的输入数据,您应该能够看到延迟的元素被传递到侧输出。
你可能应该看看 documentation,那里解释了 Flink 中的各种时间概念。
我觉得水印方法很可疑。通常,您会在此时输出最新的事件时间戳。
只是一些背景信息,这样更容易理解。
迟到事件是指在水印处理之后到事件之后的某个时间发生的事件。考虑以下示例:
event1 @time 1
event2 @time 2
watermark1 @time 3
event3 @time 1 <-- late event
event4 @time 4
您的水印方法几乎会将所有过去的事件呈现为迟到的事件(由于 1s 水印间隔,有点容忍)。这也会使再加工和追赶变得不可能。
但是,您实际上没有看到任何延迟事件,这让我更加惊讶。您能否仔细检查您的水印方法、描述您的用例并提供示例数据?很多时候,实现对于实际用例来说并不理想,应该以不同的方式解决。