Flink:Flink是否支持抽象算子,可以处理具有公共字段的不同数据流?
Flink: does Flink support abstract operator which can process different data streams with common fields?
假设我们有多个数据流,它们有一些共同的特征。
例如,我们有一个 Teacher 流和一个 Student 流,它们都有一个 年龄 字段。如果我想从实时流中找出最年长的学生或老师,我可以实现如下运算符。
public MaxiumAgeFunc extends RichMapFunction<Student,Integer> {
int maxAge;
@Override
public void flatMap(Student s, Collector<Integer> collector) throws Exception {
if(s.age > maxAge){
maxAge = s.age;
}
collector.collect(maxAge);
}
}
要找出最年长的老师,我们需要实现类似下面的运算符
public MaxiumAgeFunc extends RichMapFunction<Teacher,Integer> {
int maxAge;
@Override
public void flatMap(Teacher t, Collector<Integer> collector) throws Exception {
if(t.age > maxAge){
maxAge = t.age;
}
collector.collect(maxAge);
}
}
但是实际上这两个算子有共同的流程逻辑,所以我的想法是定义一个parentclass,比如People.
public class People{
public Integer age;
}
那么Student和Teacher可以定义为他们的childclass,也保留自己的字段。
public class Student extends People {
public Integer grade; // student grade
...
}
public class Student extends People {
public Integer subject; // the subject that teacher teaches
...
}
在这种情况下,我可以定义一个运算符,如下所示。
public MaxiumAgeFunc extends RichMapFunction<People,Integer> {
int maxAge;
@Override
public void flatMap(People p, Collector<Integer> collector) throws Exception {
if(t.age > maxAge){
maxAge = p.age;
}
collector.collect(maxAge);
}
}
但是当我尝试使用这个算子实现Flink执行拓扑时,由于数据类型不匹配,无法正常工作。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Student> studentStream = env.addSource(...);
DataStream<Teacher> teacherStream = env.addSource(...);
studentStream.map(new MaxiumAgeFunc()).print();
teacherStream.map(new MaxiumAgeFunc()).print();
这是我的问题,是否可以为具有公共字段的输入流创建一个抽象运算符?
这比 Flink 问题更 Java:
你要做的是MaxiumAgeFunc
像这样参数化
public MaxiumAgeFunc<T extends People> extends RichMapFunction<T, Integer> {
int maxAge;
@Override
public void flatMap(T p, Collector<Integer> collector) throws Exception {
if(t.age > maxAge){
maxAge = p.age;
}
collector.collect(maxAge);
}
}
然后像这样使用它
studentStream.map(new MaxiumAgeFunc<>()).print();
teacherStream.map(new MaxiumAgeFunc<>()).print();
编辑:
顺便说一下,您的函数不适用于 checkpointing (so will produce wrong results upon recovery from a checkpoint) and I'd rather go with an aggregation function over the global window。
students
.windowAll(GlobalWindows.create())
.aggregate(new AggregateFunction<People, Integer, Integer>() {
@Override
public Integer createAccumulator() {
return -1;
}
@Override
public Integer add(People value, Integer accumulator) {
return Math.max(value.age, accumulator);
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b) {
return Math.max(a, b);
}
});
假设我们有多个数据流,它们有一些共同的特征。
例如,我们有一个 Teacher 流和一个 Student 流,它们都有一个 年龄 字段。如果我想从实时流中找出最年长的学生或老师,我可以实现如下运算符。
public MaxiumAgeFunc extends RichMapFunction<Student,Integer> {
int maxAge;
@Override
public void flatMap(Student s, Collector<Integer> collector) throws Exception {
if(s.age > maxAge){
maxAge = s.age;
}
collector.collect(maxAge);
}
}
要找出最年长的老师,我们需要实现类似下面的运算符
public MaxiumAgeFunc extends RichMapFunction<Teacher,Integer> {
int maxAge;
@Override
public void flatMap(Teacher t, Collector<Integer> collector) throws Exception {
if(t.age > maxAge){
maxAge = t.age;
}
collector.collect(maxAge);
}
}
但是实际上这两个算子有共同的流程逻辑,所以我的想法是定义一个parentclass,比如People.
public class People{
public Integer age;
}
那么Student和Teacher可以定义为他们的childclass,也保留自己的字段。
public class Student extends People {
public Integer grade; // student grade
...
}
public class Student extends People {
public Integer subject; // the subject that teacher teaches
...
}
在这种情况下,我可以定义一个运算符,如下所示。
public MaxiumAgeFunc extends RichMapFunction<People,Integer> {
int maxAge;
@Override
public void flatMap(People p, Collector<Integer> collector) throws Exception {
if(t.age > maxAge){
maxAge = p.age;
}
collector.collect(maxAge);
}
}
但是当我尝试使用这个算子实现Flink执行拓扑时,由于数据类型不匹配,无法正常工作。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Student> studentStream = env.addSource(...);
DataStream<Teacher> teacherStream = env.addSource(...);
studentStream.map(new MaxiumAgeFunc()).print();
teacherStream.map(new MaxiumAgeFunc()).print();
这是我的问题,是否可以为具有公共字段的输入流创建一个抽象运算符?
这比 Flink 问题更 Java:
你要做的是MaxiumAgeFunc
像这样参数化
public MaxiumAgeFunc<T extends People> extends RichMapFunction<T, Integer> {
int maxAge;
@Override
public void flatMap(T p, Collector<Integer> collector) throws Exception {
if(t.age > maxAge){
maxAge = p.age;
}
collector.collect(maxAge);
}
}
然后像这样使用它
studentStream.map(new MaxiumAgeFunc<>()).print();
teacherStream.map(new MaxiumAgeFunc<>()).print();
编辑:
顺便说一下,您的函数不适用于 checkpointing (so will produce wrong results upon recovery from a checkpoint) and I'd rather go with an aggregation function over the global window。
students
.windowAll(GlobalWindows.create())
.aggregate(new AggregateFunction<People, Integer, Integer>() {
@Override
public Integer createAccumulator() {
return -1;
}
@Override
public Integer add(People value, Integer accumulator) {
return Math.max(value.age, accumulator);
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b) {
return Math.max(a, b);
}
});