它是 fink 1.12 批处理模式的 BUG 吗?
Is it a BUG with fink 1.12 batch mode?
当我使用Flink 1.12批处理时,我的代码:
public class Main {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
// new Person("Pebbles", 2),
new Person("Pebbles", 2)
);
flintstones.keyBy(person -> person.age)
.reduce((a, b) -> {
a.age = a.age + b.age; return a;
}).print();
env.execute();
}
public static class Person {
public String name;
public Integer age;
public Person() {};
public Person(String name, Integer age) {
this.name = name;
this.age = age;
};
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
};
}
}
我认为结果应该是:
Fred: age 70
Pebbles: age 2
但结果是:
11> Fred: age 70
丢失Pebbles: age 2
。
如果我取消注释 new Person("Pebbles", 2),
我可以获得正确的结果:
1> Pebbles: age 4
11> Fred: age 70
如果我使用数据集,我也可以获得正确的结果。代码:
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
// new Person("Pebbles", 2),
new Person("Pebbles", 2)
);
flintstones.groupBy(person -> person.age)
.reduce((a, b) -> {
a.age = a.age + b.age; return a;
}).print();
// env.execute();
}
结果:
Fred: age 70
Pebbles: age 2
代码仅供测试,没有任何商业价值significance.Is这是BUG还是我看错了?
我的专家
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-runtime_2.12</artifactId>
<version>1.12.0</version>
</dependency>
reduce批量执行模式有bug,master已经修复,1.12.1会修复。参见 FLINK-20764。
当我使用Flink 1.12批处理时,我的代码:
public class Main {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
// new Person("Pebbles", 2),
new Person("Pebbles", 2)
);
flintstones.keyBy(person -> person.age)
.reduce((a, b) -> {
a.age = a.age + b.age; return a;
}).print();
env.execute();
}
public static class Person {
public String name;
public Integer age;
public Person() {};
public Person(String name, Integer age) {
this.name = name;
this.age = age;
};
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
};
}
}
我认为结果应该是:
Fred: age 70
Pebbles: age 2
但结果是:
11> Fred: age 70
丢失Pebbles: age 2
。
如果我取消注释 new Person("Pebbles", 2),
我可以获得正确的结果:
1> Pebbles: age 4
11> Fred: age 70
如果我使用数据集,我也可以获得正确的结果。代码:
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
// new Person("Pebbles", 2),
new Person("Pebbles", 2)
);
flintstones.groupBy(person -> person.age)
.reduce((a, b) -> {
a.age = a.age + b.age; return a;
}).print();
// env.execute();
}
结果:
Fred: age 70
Pebbles: age 2
代码仅供测试,没有任何商业价值significance.Is这是BUG还是我看错了?
我的专家
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-runtime_2.12</artifactId>
<version>1.12.0</version>
</dependency>
reduce批量执行模式有bug,master已经修复,1.12.1会修复。参见 FLINK-20764。