Apache Flink - 从另一个 Stream 的 MapFunction 访问 WindowedStream 的内部缓冲区
Apache Flink - Access internal buffer of WindowedStream from another Stream's MapFunction
我有一个基于 Apache Flink 的流应用程序,具有以下设置:
- 数据源:每分钟生成一次数据。
- Windowed Stream 使用 CountWindow,size=100,slide=1(滑动计数 window)。
- ProcessWindow函数对 Window.
中的数据应用一些计算(例如 F(x) )
- 使用输出流的数据接收器
这很好用。现在,我想让用户能够提供函数 G(x) 并将其应用于 Window 中的当前数据,并实时将输出发送给用户
我不是在询问如何应用任意函数 G(x) - 我正在使用动态脚本来执行此操作。我在问如何从另一个流的映射函数访问 window 中的缓冲数据。
澄清一些代码
DataStream<Foo> in = .... // source data produced every minute
in
.keyBy(new MyKeySelector())
.countWindow(100, 1)
.process(new MyProcessFunction())
.addSink(new MySinkFunction())
// The part above is working fine. Note that windowed stream created by countWindow() function above has to maintain internal buffer. Now the new requirement
DataStream<Function> userRequest = .... // request function from user
userRequest.map(new MapFunction<Function, FunctionResult>(){
public FunctionResult map(Function Gx) throws Exception {
Iterable<Foo> windowedDataFromAbove = // HOW TO GET THIS???
FunctionResult result = Gx.apply(windowedDataFromAbove);
return result;
}
})
连接两个流,然后使用 CoProcessFunction。获取函数流的方法调用可以将它们应用于其他方法调用的 window.
中的内容
如果你想广播 Functions,那么你要么需要使用 Flink 1.5(它支持连接键控流和广播流),要么使用一些直升机特技来创建一个可以同时包含 Foo 和 Function 的流类型,适当复制函数(和密钥生成)来模拟广播。
假设 Fx 即时聚合传入的 foo,而 Gx 处理 window 的 foo,您应该能够实现您想要的,如下所示:
DataStream<Function> userRequest = .... // request function from user
Iterator<Function> iter = DataStreamUtils.collect(userRequest);
Function Gx = iter.next();
DataStream<Foo> in = .... // source data
.keyBy(new MyKeySelector())
.countWindow(100, 1)
.fold(new ArrayList<>(), new MyFoldFunc(), new MyProcessorFunc(Gx))
.addSink(new MySinkFunction())
折叠函数(传入数据一到达就对其进行操作)可以这样定义:
private static class MyFoldFunc implements FoldFunction<foo, Tuple2<Integer, List<foo>>> {
@Override
public Tuple2<Integer, List<foo>> fold(Tuple2<Integer, List<foo>> acc, foo f) {
acc.f0 = acc.f0 + 1; // if Fx is a simple aggregation (count)
acc.f1.add(foo);
return acc;
}
}
处理器函数可以是这样的:
public class MyProcessorFunc
extends ProcessWindowFunction<Tuple2<Integer, List<foo>>, Tuple2<Integer, FunctionResult>, String, TimeWindow> {
public MyProcessorFunc(Function Gx) {
super();
this.Gx = Gx;
}
@Override
public void process(String key, Context context,
Iterable<Tuple2<Integer, List<foo>> accIt,
Collector<Tuple2<Integer, FunctionResult>> out) {
Tuple2<Integer, List<foo> acc = accIt.iterator().next();
out.collect(new Tuple2<Integer, FunctionResult>(
acc.f0, // your Fx aggregation
Gx.apply(acc.f1), // your Gx results
));
}
}
请注意,fold\reduce 函数默认不会在内部缓冲元素。我们在这里使用 fold 来计算动态指标,并创建一个包含 window 个项目的列表。
如果您有兴趣在翻滚 windows(而非滑动)上应用 Gx,您可以在管道中使用翻滚 windows。要计算滑动计数 和 ,您可以让管道的另一个分支仅计算滑动计数(不应用 Gx)。这样,您不必为每个 window.
保留 100 个列表
注意:您可能需要添加以下依赖才能使用 DataStreamUtils:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-contrib</artifactId>
<version>0.10.2</version>
</dependency>
我有一个基于 Apache Flink 的流应用程序,具有以下设置:
- 数据源:每分钟生成一次数据。
- Windowed Stream 使用 CountWindow,size=100,slide=1(滑动计数 window)。
- ProcessWindow函数对 Window. 中的数据应用一些计算(例如 F(x) )
- 使用输出流的数据接收器
这很好用。现在,我想让用户能够提供函数 G(x) 并将其应用于 Window 中的当前数据,并实时将输出发送给用户
我不是在询问如何应用任意函数 G(x) - 我正在使用动态脚本来执行此操作。我在问如何从另一个流的映射函数访问 window 中的缓冲数据。
澄清一些代码
DataStream<Foo> in = .... // source data produced every minute
in
.keyBy(new MyKeySelector())
.countWindow(100, 1)
.process(new MyProcessFunction())
.addSink(new MySinkFunction())
// The part above is working fine. Note that windowed stream created by countWindow() function above has to maintain internal buffer. Now the new requirement
DataStream<Function> userRequest = .... // request function from user
userRequest.map(new MapFunction<Function, FunctionResult>(){
public FunctionResult map(Function Gx) throws Exception {
Iterable<Foo> windowedDataFromAbove = // HOW TO GET THIS???
FunctionResult result = Gx.apply(windowedDataFromAbove);
return result;
}
})
连接两个流,然后使用 CoProcessFunction。获取函数流的方法调用可以将它们应用于其他方法调用的 window.
中的内容如果你想广播 Functions,那么你要么需要使用 Flink 1.5(它支持连接键控流和广播流),要么使用一些直升机特技来创建一个可以同时包含 Foo 和 Function 的流类型,适当复制函数(和密钥生成)来模拟广播。
假设 Fx 即时聚合传入的 foo,而 Gx 处理 window 的 foo,您应该能够实现您想要的,如下所示:
DataStream<Function> userRequest = .... // request function from user
Iterator<Function> iter = DataStreamUtils.collect(userRequest);
Function Gx = iter.next();
DataStream<Foo> in = .... // source data
.keyBy(new MyKeySelector())
.countWindow(100, 1)
.fold(new ArrayList<>(), new MyFoldFunc(), new MyProcessorFunc(Gx))
.addSink(new MySinkFunction())
折叠函数(传入数据一到达就对其进行操作)可以这样定义:
private static class MyFoldFunc implements FoldFunction<foo, Tuple2<Integer, List<foo>>> {
@Override
public Tuple2<Integer, List<foo>> fold(Tuple2<Integer, List<foo>> acc, foo f) {
acc.f0 = acc.f0 + 1; // if Fx is a simple aggregation (count)
acc.f1.add(foo);
return acc;
}
}
处理器函数可以是这样的:
public class MyProcessorFunc
extends ProcessWindowFunction<Tuple2<Integer, List<foo>>, Tuple2<Integer, FunctionResult>, String, TimeWindow> {
public MyProcessorFunc(Function Gx) {
super();
this.Gx = Gx;
}
@Override
public void process(String key, Context context,
Iterable<Tuple2<Integer, List<foo>> accIt,
Collector<Tuple2<Integer, FunctionResult>> out) {
Tuple2<Integer, List<foo> acc = accIt.iterator().next();
out.collect(new Tuple2<Integer, FunctionResult>(
acc.f0, // your Fx aggregation
Gx.apply(acc.f1), // your Gx results
));
}
}
请注意,fold\reduce 函数默认不会在内部缓冲元素。我们在这里使用 fold 来计算动态指标,并创建一个包含 window 个项目的列表。
如果您有兴趣在翻滚 windows(而非滑动)上应用 Gx,您可以在管道中使用翻滚 windows。要计算滑动计数 和 ,您可以让管道的另一个分支仅计算滑动计数(不应用 Gx)。这样,您不必为每个 window.
保留 100 个列表注意:您可能需要添加以下依赖才能使用 DataStreamUtils:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-contrib</artifactId>
<version>0.10.2</version>
</dependency>