它是 fink 1.12 批处理模式的 BUG 吗?

Is it a BUG with fink 1.12 batch mode?

当我使用Flink 1.12批处理时,我的代码:

public class Main {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        DataStream<Person> flintstones = env.fromElements(
                new Person("Fred", 35),
                new Person("Wilma", 35),
//                new Person("Pebbles", 2),
                new Person("Pebbles", 2)
        );

        flintstones.keyBy(person -> person.age)
                .reduce((a, b) -> {
                    a.age = a.age + b.age; return a;
                }).print();

        env.execute();
    }

    public static class Person {
        public String name;
        public Integer age;
        public Person() {};

        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        };

        public String toString() {
            return this.name.toString() + ": age " + this.age.toString();
        };
    }

}

我认为结果应该是:

Fred: age 70
Pebbles: age 2

但结果是:

11> Fred: age 70

丢失Pebbles: age 2。 如果我取消注释 new Person("Pebbles", 2), 我可以获得正确的结果:

1> Pebbles: age 4
11> Fred: age 70

如果我使用数据集,我也可以获得正确的结果。代码:

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Person> flintstones = env.fromElements(
                new Person("Fred", 35),
                new Person("Wilma", 35),
//                new Person("Pebbles", 2),
                new Person("Pebbles", 2)
        );

        flintstones.groupBy(person -> person.age)
                .reduce((a, b) -> {
                    a.age = a.age + b.age; return a;
                }).print();

//        env.execute();
    }

结果:

Fred: age 70
Pebbles: age 2

代码仅供测试,没有任何商业价值significance.Is这是BUG还是我看错了?
我的专家

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-queryable-state-runtime_2.12</artifactId>
    <version>1.12.0</version>
</dependency>

reduce批量执行模式有bug,master已经修复,1.12.1会修复。参见 FLINK-20764