Flink 滑动计数 window 行为
Flink Sliding count window behavior
假设我们有这样的数据结构:
Tuple2<ArryaList<Long>, Integer>
第一个字段是一个 ArrayList
,长度为 1,包含时间戳,整数字段是一个介于 1 和 40 之间的数字,名为 channel
。目标是使用相同的键 (channel
) 聚合每 400 条消息并对它们应用 ReduceFunction
(它只是合并元组第一个字段中的 400 条消息的时间戳)。
我将 channel
字段设置为消息的键并创建一个 Count Window 为 400。例如,如果我们有 160000 消息作为输入,它应该输出 160000/400 = 400
行和计数 window 按预期工作。问题是当我使用滑动计数 window 时,我的预期行为是:
Flink 为每个 channel
数字创建逻辑 windows 并首次应用 ReduceFunction
, 如果逻辑 window的长度达到400,之后每输入100条数据,与逻辑window的key相同的key,就会调用ReduceFunction
获取[=80=中的最后400条消息], 所以我们应该有:
160000 - 400 = 159600
// 前400个输入会第一次调用reduce函数
159600 / 100 = 1596
// 在前 400 个输入后,对于每 100 个输入,Flink 为最后 400 个输入调用 reduce 函数
1 + 1596 = 1597
// 输出行数
但是运行滑动计数window,它输出1600行,长度可变。 (我预计输出长度仅为 400)
Point: 说的length 我的意思是ArrayList的大小(Tuple2的第一个字段)
- 第 40 个通道 --> 长度为 100
- 第二个40通道-->长度为299
- 第三个40通道-->长度为598
- 第四个40通道-->长度997
- 剩下40个通道-->长度为400
我如何证明这种行为的合理性并实现我想要的滑动计数 window?
这里是源代码:
DataStream<Tuple2<ArrayList<Long>, Integer>> data ;
data.keyBy(1).countWindow(400, 100)
.reduce(new ReduceFunction<Tuple2<ArrayList<Long>, Integer>>() {
@Override
public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
t0.f0.add(t1.f0.get(0));
return t0;
}
}).writeAsText("results400").setParallelism(1);
更新: 根据@DavidAnderson的建议,我也尝试在ReduceFunstion
中创建一个新的元组而不是修改t0
,但结果是相同的输出。
public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
ArrayList<Long> times = t0.f0;
times.addAll(t1.f0);
return new Tuple2<>(times, t0.f1) ;
}
感谢David Anderson建议,修改ReduceFunction
如下解决问题。我们应该在 ReduceFunction
:
中创建一个新对象
public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
ArrayList<Long> times = new ArrayList<>();
times.addAll(t0.f0);
times.addAll(t1.f0);
return new Tuple2<>(times, t0.f1) ;
}
请注意问题中的两种 reduce 方法都会导致不正确的输出。
现在输出如下所示:
- 第 40 个频道 --> 长度为 100
- 第二个40通道-->长度为200
- 第三个 40 通道 --> 长度为 300
- 剩下的每40个频道-->长度为400
所以 Flink Sliding Count window 行为是它调用 ReduceFunction
每个滑动计数输入消息。所以在我们有 160000 个输入消息的情况下,结果编号应该是:
160000/100 = 1600
这是countWindow的实现
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}
它的行为与您预期的不太一样。 window 每 100 个元素(幻灯片)触发一次,无论它是否包含 400 个元素(大小)。大小控制最多保留多少个元素。
假设我们有这样的数据结构:
Tuple2<ArryaList<Long>, Integer>
第一个字段是一个 ArrayList
,长度为 1,包含时间戳,整数字段是一个介于 1 和 40 之间的数字,名为 channel
。目标是使用相同的键 (channel
) 聚合每 400 条消息并对它们应用 ReduceFunction
(它只是合并元组第一个字段中的 400 条消息的时间戳)。
我将 channel
字段设置为消息的键并创建一个 Count Window 为 400。例如,如果我们有 160000 消息作为输入,它应该输出 160000/400 = 400
行和计数 window 按预期工作。问题是当我使用滑动计数 window 时,我的预期行为是:
Flink 为每个 channel
数字创建逻辑 windows 并首次应用 ReduceFunction
, 如果逻辑 window的长度达到400,之后每输入100条数据,与逻辑window的key相同的key,就会调用ReduceFunction
获取[=80=中的最后400条消息], 所以我们应该有:
160000 - 400 = 159600
// 前400个输入会第一次调用reduce函数159600 / 100 = 1596
// 在前 400 个输入后,对于每 100 个输入,Flink 为最后 400 个输入调用 reduce 函数1 + 1596 = 1597
// 输出行数
但是运行滑动计数window,它输出1600行,长度可变。 (我预计输出长度仅为 400)
Point: 说的length 我的意思是ArrayList的大小(Tuple2的第一个字段)
- 第 40 个通道 --> 长度为 100
- 第二个40通道-->长度为299
- 第三个40通道-->长度为598
- 第四个40通道-->长度997
- 剩下40个通道-->长度为400
我如何证明这种行为的合理性并实现我想要的滑动计数 window?
这里是源代码:
DataStream<Tuple2<ArrayList<Long>, Integer>> data ;
data.keyBy(1).countWindow(400, 100)
.reduce(new ReduceFunction<Tuple2<ArrayList<Long>, Integer>>() {
@Override
public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
t0.f0.add(t1.f0.get(0));
return t0;
}
}).writeAsText("results400").setParallelism(1);
更新: 根据@DavidAnderson的建议,我也尝试在ReduceFunstion
中创建一个新的元组而不是修改t0
,但结果是相同的输出。
public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
ArrayList<Long> times = t0.f0;
times.addAll(t1.f0);
return new Tuple2<>(times, t0.f1) ;
}
感谢David Anderson建议,修改ReduceFunction
如下解决问题。我们应该在 ReduceFunction
:
public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
ArrayList<Long> times = new ArrayList<>();
times.addAll(t0.f0);
times.addAll(t1.f0);
return new Tuple2<>(times, t0.f1) ;
}
请注意问题中的两种 reduce 方法都会导致不正确的输出。 现在输出如下所示:
- 第 40 个频道 --> 长度为 100
- 第二个40通道-->长度为200
- 第三个 40 通道 --> 长度为 300
- 剩下的每40个频道-->长度为400
所以 Flink Sliding Count window 行为是它调用 ReduceFunction
每个滑动计数输入消息。所以在我们有 160000 个输入消息的情况下,结果编号应该是:
160000/100 = 1600
这是countWindow的实现
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}
它的行为与您预期的不太一样。 window 每 100 个元素(幻灯片)触发一次,无论它是否包含 400 个元素(大小)。大小控制最多保留多少个元素。