强制驱逐滑动事件 windows 以在 Flink 上处理(历史流)

Force eviction of sliding event windows for processing (historical streams) on Flink

目前正在使用Flink进行流处理引擎的研究。在我的研究中,我使用历史流,它由以下形式的元组组成:

event_time, attribute_1, ..., attribute_X

其中 event_time 在处理过程中用作 TimeCharacteristic.EventTime。此外,我通过以下任一方式将我的数据集推送到处理拓扑中:(i) 创建内存结构,或 (ii) 通过读取 CSV 文件本身。

不幸的是,我注意到即使有足够多的元组到达 window 运算符并完成完整的 window,window 也不会被推送到下游进行处理。结果,性能显着下降,很多时候我有一个 OutOfMemoryError 异常(具有大量历史流)。

为了说明典型的用例,我给出了以下示例:

StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
env.setMaxParallelism(1);
List<Tuple2<Long, Integer>> l = new ArrayList<>();
    l.add(new Tuple2<>(1L, 11));
    l.add(new Tuple2<>(2L, 22));
    l.add(new Tuple2<>(3L, 33));
    l.add(new Tuple2<>(4L, 44));
    l.add(new Tuple2<>(5L, 55));
    DataStream<Tuple2<Long, Integer>> stream = env.fromCollection(l);
    stream.assignTimestampsAndWatermarks(
        new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {
            @Override
            public long extractAscendingTimestamp(Tuple2<Long, Integer> t) {
                return t.f0;
            }
        })
        .windowAll(SlidingEventTimeWindows.of(Time.milliseconds(2), 
                Time.milliseconds(1)))
        .sum(1)
        .print();
    env.execute();

根据l的内容,我需要得到以下windowed结果:

每个列表项可以读作[开始时间戳,结束时间戳),总和:X。

我希望 Flink 每次出现一个时间戳超过打开 window 的结束时间戳的元组时,都会产生一个 windowed 结果。例如,我希望在将带有时间戳 4L 的元组输入 window 运算符时生成 window [1, 3) 的总和。但是,当来自 l 的所有元组都被推送到流的拓扑中时,处理开始。当我处理更大的历史流时,也会发生同样的事情,这会导致性能下降(甚至耗尽内存)。

问题: 如何强制 Flink 在 window 完成时将 windows 推送到下游进行处理?

我相信对于 SlidingEventTimeWindows,window 的驱逐是通过水印触发的。如果前一个是真的,我如何编写我的拓扑,以便它们在具有较晚时间戳的元组到达时触发 windows?

谢谢

AscendingTimestampExtractor采用周期性水印策略,Flink每n毫秒调用一次getCurrentWatermark()方法,n为autowatermarkinterval.

默认间隔为 200 毫秒,与您 windows 的大小相比非常长。但是,它们不能直接进行比较——200 毫秒是以处理时间而不是事件时间来衡量的。不过,我怀疑如果您没有更改此配置设置,那么在发出第一个水印之前会创建很多 windows,我认为这可以解释您所看到的。

您可以减少自动加水印的时间间隔(可能减少到 1 毫秒)。或者您可以实施 AssignerWithPunctuatedWatermarks,这会给您更多的控制权。