Apache Flink:如何处理三个流
Apache Flink: How to process three streams
我想在一个operator.For中接收并处理三个流,在Storm中实现的代码如下:
builder.setBolt("C_bolt", C_bolt(), parallelism_hint)
.fieldsGrouping("A_bolt", "TRAINING", new Fields("word"))
.fieldsGrouping("B_bolt", "ANALYSIS", new Fields("word"))
.allGrouping("A_bolt", "SUM");
在Flink中实现了SUM stream(A_bolt's SideOutput)
和TRAINING stream(A_bolt)
的处理:
SingleOutputStreamOperator<Tuple3<String, Integer, Boolean>> A_bolt;
DataStream<Tuple2<Integer, Integer>> Sum = A_bolt.getSideOutput(outputTag).broadcast();
DataStream<Tuple3<String, String, Integer>> B_bolt;
DataStream<String> C_bolt= A_bolt
.keyBy(new KeySelector<Tuple3<String,Integer,Boolean>, String>() {
@Override
public String getKey(Tuple3<String,Integer,Boolean> in) throws Exception {
return in.f0;
}
})
.connect(Sum)
.flatMap(new Process())
.setParallelism(parallelism);
但是我不知道怎么加ANALYSIS stream(B_bolt)
。感谢您的帮助。
Flink 只支持单输入和双输入流算子。您的选择是:
- 使用 union() 创建一个包含来自所有三个流的所有元素的合并流(它们必须都是同一类型,尽管您可以使用 Either 来协助完成此操作)。
- 在使用coFlatMap 合并两个流后,将初步结果连接到第三个流,使用另一个coFlatMap(或coProcessFunction)完成处理。
或者这两种技术的组合在您的情况下更可取。
我想在一个operator.For中接收并处理三个流,在Storm中实现的代码如下:
builder.setBolt("C_bolt", C_bolt(), parallelism_hint)
.fieldsGrouping("A_bolt", "TRAINING", new Fields("word"))
.fieldsGrouping("B_bolt", "ANALYSIS", new Fields("word"))
.allGrouping("A_bolt", "SUM");
在Flink中实现了SUM stream(A_bolt's SideOutput)
和TRAINING stream(A_bolt)
的处理:
SingleOutputStreamOperator<Tuple3<String, Integer, Boolean>> A_bolt;
DataStream<Tuple2<Integer, Integer>> Sum = A_bolt.getSideOutput(outputTag).broadcast();
DataStream<Tuple3<String, String, Integer>> B_bolt;
DataStream<String> C_bolt= A_bolt
.keyBy(new KeySelector<Tuple3<String,Integer,Boolean>, String>() {
@Override
public String getKey(Tuple3<String,Integer,Boolean> in) throws Exception {
return in.f0;
}
})
.connect(Sum)
.flatMap(new Process())
.setParallelism(parallelism);
但是我不知道怎么加ANALYSIS stream(B_bolt)
。感谢您的帮助。
Flink 只支持单输入和双输入流算子。您的选择是:
- 使用 union() 创建一个包含来自所有三个流的所有元素的合并流(它们必须都是同一类型,尽管您可以使用 Either 来协助完成此操作)。
- 在使用coFlatMap 合并两个流后,将初步结果连接到第三个流,使用另一个coFlatMap(或coProcessFunction)完成处理。
或者这两种技术的组合在您的情况下更可取。