强制驱逐滑动事件 windows 以在 Flink 上处理(历史流)
Force eviction of sliding event windows for processing (historical streams) on Flink
目前正在使用Flink进行流处理引擎的研究。在我的研究中,我使用历史流,它由以下形式的元组组成:
event_time, attribute_1, ..., attribute_X
其中 event_time
在处理过程中用作 TimeCharacteristic.EventTime
。此外,我通过以下任一方式将我的数据集推送到处理拓扑中:(i) 创建内存结构,或 (ii) 通过读取 CSV 文件本身。
不幸的是,我注意到即使有足够多的元组到达 window 运算符并完成完整的 window,window 也不会被推送到下游进行处理。结果,性能显着下降,很多时候我有一个 OutOfMemoryError
异常(具有大量历史流)。
为了说明典型的用例,我给出了以下示例:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
env.setMaxParallelism(1);
List<Tuple2<Long, Integer>> l = new ArrayList<>();
l.add(new Tuple2<>(1L, 11));
l.add(new Tuple2<>(2L, 22));
l.add(new Tuple2<>(3L, 33));
l.add(new Tuple2<>(4L, 44));
l.add(new Tuple2<>(5L, 55));
DataStream<Tuple2<Long, Integer>> stream = env.fromCollection(l);
stream.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {
@Override
public long extractAscendingTimestamp(Tuple2<Long, Integer> t) {
return t.f0;
}
})
.windowAll(SlidingEventTimeWindows.of(Time.milliseconds(2),
Time.milliseconds(1)))
.sum(1)
.print();
env.execute();
根据l
的内容,我需要得到以下windowed结果:
- [0, 2) 总和:11
- [1, 3) 总和:33
- [2, 4) 总和:55
- [3, 5) 总和:77
- [4, 6) 总和:99
- [5, 7) 总和:55
每个列表项可以读作[开始时间戳,结束时间戳),总和:X。
我希望 Flink 每次出现一个时间戳超过打开 window 的结束时间戳的元组时,都会产生一个 windowed 结果。例如,我希望在将带有时间戳 4L
的元组输入 window 运算符时生成 window [1, 3) 的总和。但是,当来自 l
的所有元组都被推送到流的拓扑中时,处理开始。当我处理更大的历史流时,也会发生同样的事情,这会导致性能下降(甚至耗尽内存)。
问题: 如何强制 Flink 在 window 完成时将 windows 推送到下游进行处理?
我相信对于 SlidingEventTimeWindows
,window 的驱逐是通过水印触发的。如果前一个是真的,我如何编写我的拓扑,以便它们在具有较晚时间戳的元组到达时触发 windows?
谢谢
AscendingTimestampExtractor
采用周期性水印策略,Flink每n毫秒调用一次getCurrentWatermark()
方法,n为autowatermarkinterval.
默认间隔为 200 毫秒,与您 windows 的大小相比非常长。但是,它们不能直接进行比较——200 毫秒是以处理时间而不是事件时间来衡量的。不过,我怀疑如果您没有更改此配置设置,那么在发出第一个水印之前会创建很多 windows,我认为这可以解释您所看到的。
您可以减少自动加水印的时间间隔(可能减少到 1 毫秒)。或者您可以实施 AssignerWithPunctuatedWatermarks,这会给您更多的控制权。
目前正在使用Flink进行流处理引擎的研究。在我的研究中,我使用历史流,它由以下形式的元组组成:
event_time, attribute_1, ..., attribute_X
其中 event_time
在处理过程中用作 TimeCharacteristic.EventTime
。此外,我通过以下任一方式将我的数据集推送到处理拓扑中:(i) 创建内存结构,或 (ii) 通过读取 CSV 文件本身。
不幸的是,我注意到即使有足够多的元组到达 window 运算符并完成完整的 window,window 也不会被推送到下游进行处理。结果,性能显着下降,很多时候我有一个 OutOfMemoryError
异常(具有大量历史流)。
为了说明典型的用例,我给出了以下示例:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
env.setMaxParallelism(1);
List<Tuple2<Long, Integer>> l = new ArrayList<>();
l.add(new Tuple2<>(1L, 11));
l.add(new Tuple2<>(2L, 22));
l.add(new Tuple2<>(3L, 33));
l.add(new Tuple2<>(4L, 44));
l.add(new Tuple2<>(5L, 55));
DataStream<Tuple2<Long, Integer>> stream = env.fromCollection(l);
stream.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {
@Override
public long extractAscendingTimestamp(Tuple2<Long, Integer> t) {
return t.f0;
}
})
.windowAll(SlidingEventTimeWindows.of(Time.milliseconds(2),
Time.milliseconds(1)))
.sum(1)
.print();
env.execute();
根据l
的内容,我需要得到以下windowed结果:
- [0, 2) 总和:11
- [1, 3) 总和:33
- [2, 4) 总和:55
- [3, 5) 总和:77
- [4, 6) 总和:99
- [5, 7) 总和:55
每个列表项可以读作[开始时间戳,结束时间戳),总和:X。
我希望 Flink 每次出现一个时间戳超过打开 window 的结束时间戳的元组时,都会产生一个 windowed 结果。例如,我希望在将带有时间戳 4L
的元组输入 window 运算符时生成 window [1, 3) 的总和。但是,当来自 l
的所有元组都被推送到流的拓扑中时,处理开始。当我处理更大的历史流时,也会发生同样的事情,这会导致性能下降(甚至耗尽内存)。
问题: 如何强制 Flink 在 window 完成时将 windows 推送到下游进行处理?
我相信对于 SlidingEventTimeWindows
,window 的驱逐是通过水印触发的。如果前一个是真的,我如何编写我的拓扑,以便它们在具有较晚时间戳的元组到达时触发 windows?
谢谢
AscendingTimestampExtractor
采用周期性水印策略,Flink每n毫秒调用一次getCurrentWatermark()
方法,n为autowatermarkinterval.
默认间隔为 200 毫秒,与您 windows 的大小相比非常长。但是,它们不能直接进行比较——200 毫秒是以处理时间而不是事件时间来衡量的。不过,我怀疑如果您没有更改此配置设置,那么在发出第一个水印之前会创建很多 windows,我认为这可以解释您所看到的。
您可以减少自动加水印的时间间隔(可能减少到 1 毫秒)。或者您可以实施 AssignerWithPunctuatedWatermarks,这会给您更多的控制权。