Flink 会话缺少延迟输出 Window

Late outputs missing for Flink's Session Window

在我的管道设置中,我看不到会话 Window 的侧输出。我正在使用 Flink 1.9.1

版本 1。 我有的是:

messageStream.
    .keyBy(tradeKeySelector)
    .window(ProcessingTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
    .sideOutputLateData(lateTradeMessages)
    .process(new CumulativeTransactionOperator())
    .name("Aggregate Transaction Builder");

lateTradeMessages 实现 SessionWindowTimeGapExtractor 和 returns 5 秒。

我还有这个:

messageStream.getSideOutput(lateTradeMessages)
  .keyBy(tradeKeySelector)
  .process(new KeyedProcessFunction<Long, EnrichedMessage, Transaction>() {
     @Override
     public void processElement(EnrichedMessage value, Context ctx, Collector<Transaction> out) throws Exception {
                   System.out.println("Process Late messages For Aggregation");
                   out.collect(new Transaction());
              }
       })
   .name("Process Late messages For Aggregation");

问题是,当我使用相同的密钥发送消息时,我从来没有看到 "Process Late messages For Aggregation" 应该错过 window 时间。

当 Session Window 通过并且我 "immediately" 为同一密钥发送了一条新消息时,它会触发新 Session Window 而不会进入 Late SideOutput。

不确定我做错了什么。

我想在这里实现的是抓住 "late events" 并尝试 重新处理它们。

如有任何帮助,我将不胜感激。


版本 2,@Dominik Wosiński 评论后:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
        env.setParallelism(1);
        env.disableOperatorChaining();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);


DataStream<RawMessage> rawBusinessTransaction = env
                .addSource(new FlinkKafkaConsumer<>("business",
                        new JSONKeyValueDeserializationSchema(false), properties))
                .map(new KafkaTransactionObjectMapOperator())
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<RawMessage>() {

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(System.currentTimeMillis());
                    }

                    @Override
                    public long extractTimestamp(RawMessage element, long previousElementTimestamp) {
                        return element.messageCreationTime;
                    }
                })
                .name("Kafka Transaction Raw Data Source.");

messageStream
             .keyBy(tradeKeySelector)
             .window(EventTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
             .sideOutputLateData(lateTradeMessages)
             .process(new CumulativeTransactionOperator())
             .name("Aggregate Transaction Builder");

Watermarks 正在处理中,我查看了 Flink 的 Metrics。 Window 运算符正在执行,但仍然没有延迟输出。

顺便说一句,Kafka 主题可能是空闲的,所以我必须定期发出新的 WaterMarks。


在你的情况下你使用的是ProcessingTime,这意味着系统时间用于测量DataStream中的时间流量。

对于每个事件,分配给该事件的时间戳是您在 Flink 管道中收到数据的时刻。这意味着 Flink 处理时间无法让事件乱序。因此,您的 windows 永远不会有迟到的元素。

如果您切换到 EventTime,那么对于正确的输入数据,您应该能够看到延迟的元素被传递到侧输出。

你可能应该看看 documentation,那里解释了 Flink 中的各种时间概念。

我觉得水印方法很可疑。通常,您会在此时输出最新的事件时间戳。

只是一些背景信息,这样更容易理解。

迟到事件是指在水印处理之后到事件之后的某个时间发生的事件。考虑以下示例:

event1 @time 1
event2 @time 2
watermark1 @time 3
event3 @time 1 <-- late event
event4 @time 4

您的水印方法几乎会将所有过去的事件呈现为迟到的事件(由于 1s 水印间隔,有点容忍)。这也会使再加工和追赶变得不可能。

但是,您实际上没有看到任何延迟事件,这让我更加惊讶。您能否仔细检查您的水印方法、描述您的用例并提供示例数据?很多时候,实现对于实际用例来说并不理想,应该以不同的方式解决。