Flink large size / small advance sliding window 性能
Flink large size / small advance sliding window performance
我的用例
- 输入是由 ID 键控的原始事件
- 我想统计每个ID过去7天的事件总数。
- 每 10 分钟输出一次
- 从逻辑上讲,这将通过滑动 window 大小 7 天并提前 10 分钟来处理
这个通过1天的翻滚window制定了一个很好的优化方案
所以我的逻辑是这样的
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val oneDayCounts = joins
.keyBy(keyFunction)
.map(t => (t.key, 1L, t.timestampMs))
.keyBy(0)
.timeWindow(Time.days(1))
val sevenDayCounts = oneDayCounts
.keyBy(0)
.timeWindow(Time.days(7), Time.minutes(10))
.sum(1)
// single reducer
sevenDayCounts
.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(10)))
P.S。忘掉单个减速器的性能问题。
问题
然而,如果我理解正确,这意味着由于滑动的性质,单个事件将产生 7*24*6=1008 条记录 window。所以我的问题是如何减少绝对数量?
有一张 JIRA 票 -- FLINK-11276 -- and a google doc 关于更有效地执行此操作的主题。
我还建议您看一下这个 paper and talk 关于 Efficient Window Aggregation with Stream Slicing.
我的用例
- 输入是由 ID 键控的原始事件
- 我想统计每个ID过去7天的事件总数。
- 每 10 分钟输出一次
- 从逻辑上讲,这将通过滑动 window 大小 7 天并提前 10 分钟来处理
这个
所以我的逻辑是这样的
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val oneDayCounts = joins
.keyBy(keyFunction)
.map(t => (t.key, 1L, t.timestampMs))
.keyBy(0)
.timeWindow(Time.days(1))
val sevenDayCounts = oneDayCounts
.keyBy(0)
.timeWindow(Time.days(7), Time.minutes(10))
.sum(1)
// single reducer
sevenDayCounts
.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(10)))
P.S。忘掉单个减速器的性能问题。
问题
然而,如果我理解正确,这意味着由于滑动的性质,单个事件将产生 7*24*6=1008 条记录 window。所以我的问题是如何减少绝对数量?
有一张 JIRA 票 -- FLINK-11276 -- and a google doc 关于更有效地执行此操作的主题。
我还建议您看一下这个 paper and talk 关于 Efficient Window Aggregation with Stream Slicing.