Apache Flink 是否支持具有相同时间戳的多个事件?

Does Apache Flink support multiple events with the same timestamp?

在某些情况下,Apache Flink 似乎无法很好地处理具有相同时间戳的两个事件。

根据文档,t 的水印表示任何新事件的时间戳都将严格大于 t。除非您可以完全排除两个事件具有相同时间戳的可能性,否则您将不会安全地发出 t 的水印。强制执行不同的时间戳还将系统每秒可以处理的事件数限制为 1000。

这真的是 Apache Flink 中的问题还是有解决方法?

对于那些想要具体示例的人来说,我的用例是为按事件时间排序的流构建每小时汇总的滚动字数。对于我在文件中复制的数据样本(注意重复的 9):

mario 0
luigi 1
mario 2
mario 3
vilma 4
fred 5
bob 6
bob 7
mario 8
dan 9
dylan 9
dylan 11
fred 12
mario 13
mario 14
carl 15
bambam 16
summer 17
anna 18
anna 19
edu 20
anna 21
anna 22
anna 23
anna 24
anna 25

代码:

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
            .setParallelism(1)
            .setMaxParallelism(1);

    env.setStreamTimeCharacteristic(EventTime);


    String fileLocation = "full file path here";
    DataStreamSource<String> rawInput = env.readFile(new TextInputFormat(new Path(fileLocation)), fileLocation);

    rawInput.flatMap(parse())
            .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<TimestampedWord>() {
                @Nullable
                @Override
                public Watermark checkAndGetNextWatermark(TimestampedWord lastElement, long extractedTimestamp) {
                    return new Watermark(extractedTimestamp);
                }

                @Override
                public long extractTimestamp(TimestampedWord element, long previousElementTimestamp) {
                    return element.getTimestamp();
                }
            })
            .keyBy(TimestampedWord::getWord)
            .process(new KeyedProcessFunction<String, TimestampedWord, Tuple3<String, Long, Long>>() {
                private transient ValueState<Long> count;

                @Override
                public void open(Configuration parameters) throws Exception {
                    count = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Long.class));
                }

                @Override
                public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
                    if (count.value() == null) {
                        count.update(0L);
                        setTimer(ctx.timerService(), value.getTimestamp());
                    }

                    count.update(count.value() + 1);
                }

                @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
                    long currentWatermark = ctx.timerService().currentWatermark();
                    out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
                    if (currentWatermark < Long.MAX_VALUE) {
                        setTimer(ctx.timerService(), currentWatermark);
                    }
                }

                private void setTimer(TimerService service, long t) {
                    service.registerEventTimeTimer(((t / 10) + 1) * 10);
                }
            })
            .addSink(new PrintlnSink());

    env.execute();
}

private static FlatMapFunction<String, TimestampedWord> parse() {
    return new FlatMapFunction<String, TimestampedWord>() {
        @Override
        public void flatMap(String value, Collector<TimestampedWord> out) {
            String[] wordsAndTimes = value.split(" ");
            out.collect(new TimestampedWord(wordsAndTimes[0], Long.parseLong(wordsAndTimes[1])));
        }
    };
}

private static class TimestampedWord {
    private final String word;
    private final long timestamp;

    private TimestampedWord(String word, long timestamp) {
        this.word = word;
        this.timestamp = timestamp;
    }

    public String getWord() {
        return word;
    }

    public long getTimestamp() {
        return timestamp;
    }
}

private static class PrintlnSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<Tuple3<String, Long, Long>> {
    @Override
    public void invoke(Tuple3<String, Long, Long> value, Context context) throws Exception {
        long timestamp = value.getField(2);
        System.out.println(value.getField(0) + "=" + value.getField(1) + " at " + (timestamp - 10) + "-" + (timestamp - 1));
    }
}

我明白了

    mario=4 at 1-10
    dylan=2 at 1-10
    luigi=1 at 1-10
    fred=1 at 1-10
    bob=2 at 1-10
    vilma=1 at 1-10
    dan=1 at 1-10
    vilma=1 at 10-19
    luigi=1 at 10-19
    mario=6 at 10-19
    carl=1 at 10-19
    bambam=1 at 10-19
    dylan=2 at 10-19
    summer=1 at 10-19
    anna=2 at 10-19
    bob=2 at 10-19
    fred=2 at 10-19
    dan=1 at 10-19
    fred=2 at 9223372036854775797-9223372036854775806
    dan=1 at 9223372036854775797-9223372036854775806
    carl=1 at 9223372036854775797-9223372036854775806
    mario=6 at 9223372036854775797-9223372036854775806
    vilma=1 at 9223372036854775797-9223372036854775806
    edu=1 at 9223372036854775797-9223372036854775806
    anna=7 at 9223372036854775797-9223372036854775806
    summer=1 at 9223372036854775797-9223372036854775806
    bambam=1 at 9223372036854775797-9223372036854775806
    luigi=1 at 9223372036854775797-9223372036854775806
    bob=2 at 9223372036854775797-9223372036854775806
    dylan=2 at 9223372036854775797-9223372036854775806

注意 dylan=2 在 0-9 处应该是 1。

不,流元素具有相同的时间戳没有问题。但是 Watermark 断言所有随后发生的事件的时间戳都大于水印,因此这确实意味着您不能在时间 [=28] 安全地为流元素发出 Watermark t =]t,除非流中的时间戳严格单调递增——如果有多个事件具有相同的时间戳,则情况并非如此。这就是 AscendingTimestampExtractor 生成等于 currentTimestamp - 1 的水印的原因,您也应该这样做。

请注意,您的应用程序实际上报告 dylan=2 在 0-10,而不是在 0-9。这是因为 dylan 在时间 11 产生的水印触发了第一个计时器(计时器设置为时间 10,但由于没有时间戳为 10 的元素,该计时器不会触发,直到水印来自 "dylan 11" 到达)。并且您的 PrintlnSink 使用 timestamp - 1 来指示时间跨度的上限,因此是 11 - 1 或 10,而不是 9。

您的 ProcessFunction 的输出没有任何问题,如下所示:

(mario,4,11)
(dylan,2,11)
(luigi,1,11)
(fred,1,11)
(bob,2,11)
(vilma,1,11)
(dan,1,11)
(vilma,1,20)
(luigi,1,20)
(mario,6,20)
(carl,1,20)
(bambam,1,20)
(dylan,2,20)
...

到时间11确实有两个迪伦。但是 PrintlnSink 制作的报告具有误导性。

要让您的示例按预期工作,需要更改两件事。首先,水印需要满足水印合同,目前情况并非如此,其次,windowing 逻辑不太正确。 ProcessFunction 需要为 "dylan 11" 事件做好准备,以便在关闭 0-9 的 window 的计时器触发之前到达。这是因为 "dylan 11" 流元素在流中先于从中生成的水印。

更新:时间戳超出当前window(例如"dylan 11")的事件可以由

处理
  1. 跟踪当前 window 何时结束
  2. 不是增加计数器,而是将当前 window 之后的事件添加到列表
  3. 在 window 结束后,使用该列表中属于下一个 window
  4. 的事件