你能用泛型实现 Flink 的 AggregateFunction 吗?

Can you implement Flink's AggregateFunction with Generic Types?

我的目标是为Flink 1.10中的流处理模块提供一个接口。管道在其他运算符中包含一个 AggregateFunction。所有运算符都有泛型,但问题出在 AggregateFunction 中,它无法确定输出类型。

注意:实际的管道有一个 slidingEventTimeWindow 分配器和一个与 AggregateFunction 一起传递的 WindowFunction,但是使用下面的代码可以更容易地重现错误。

这是一个重现错误的简单测试用例:

    @Test
    public void aggregateFunction_genericType() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Tuple2<String,Integer>> source = env.fromElements(Tuple2.of("0",1), Tuple2.of("0",2), Tuple2.of("0",3));

        ConfigAPI cfg = new ConfigAPI();

        source
                .keyBy(k -> k.f0)
                .countWindow(5, 1)
                .aggregate(new GenericAggregateFunc<>(cfg))
                .print();


        env.execute();
    }

如您所见,配置 class 作为参数传递给自定义 aggregateFunction。这就是用户要实现的。

    public static class ConfigAPI implements BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String,Integer>> {
        @Override
        public Tuple2<String, Integer> createAcc() {
            return new Tuple2<>("0", 0);
        }

        @Override
        public Tuple2<String, Integer> addAccumulators(Tuple2<String, Integer> in, Tuple2<String, Integer> acc) {
            acc.f1 += in.f1;
            return acc;
        }
    }

提供的接口为:

    public interface BaseConfigAPI<In, Acc> {
        Acc createAcc();
        Acc addAccumulators(In in, Acc acc);
        // other methods to override
    }

GenericAggregateFunction:

    public static class GenericAggregateFunc<In, Acc> implements AggregateFunction<In, Acc, Acc> {

        private BaseConfigAPI<In, Acc> cfg;
        GenericAggregateFunc(BaseConfigAPI<In, Acc> cfg) {
            this.cfg = cfg;
        }
        @Override
        public Acc createAccumulator() {
            return cfg.createAcc();
        }
        @Override
        public Acc add(In in, Acc acc) {
            return cfg.addAccumulators(in, acc);
        }
        @Override
        public Acc getResult(Acc acc) {
            return acc;
        }
        @Override
        public Acc merge(Acc acc, Acc acc1) {
            return null;
        }
    }

输出日志:

org.apache.flink.api.common.functions.InvalidTypesException: 
Type of TypeVariable 'Acc' in 'class misc.SlidingWindow$GenericAggregateFunc' could not be determined. This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). 
Otherwise the type has to be specified explicitly using type information.

解决方案 1(无效): 起初我以为这是“无法确定return类型”的常见情况,所以我尝试添加

.returns(Types.TUPLE(Types.STRING, Types.INT)).aggregate(...) 之后但没有成功。

解决方案 2(有效): 我用泛型创建了一个 Wrapper class,命名为 Accumulator<Acc> 然后作为 Type 传递给 AggregateFunction<In, Accumulator<Acc>, Accumulator<Acc>> 似乎有效。

虽然这看起来不太优雅,而且与界面的其余部分不太一致。这个问题还有其他解决方案吗?

编辑:感谢@deduper 的时间和洞察力,我想我找到了解决方案。

解决方案 3(有效):我创建了一个新界面,它以下列方式扩展了我的 BaseConfigAPIAggregateFunction

public interface MergedConfigAPI<In, Acc, Out> extends BaseConfigAPI, AggregateFunction<In, Acc, Out> {}

public interface BaseConfigAPI extends Serializable {
    //These will be implemented directly from AggregateFunction interface
    //Acc createAcc();
    //Acc addAccumulators(In in, Acc acc);
        
    //other methods to override
}

现在用户只需实现 MergedConfigAPI<In, Acc, Out> 并将其作为参数传递给 .aggregate(...) 函数。

更新:我针对框架测试了@deduper 的第三个解决方案,但它也没有用。似乎异常是由 Acc 而不是 Out 类型抛出的。仔细查看 .aggregate 运算符的内部结构,我意识到有一个重载的 aggregate 方法需要另外 2 个参数。一个 TypeInformation<ACC> accumulatorType 和一个 TypeInformation<R> returnType.

这是最简单的解决方案,无需任何代码重构。

解决方案 4(有效)

 @Test
 public void aggregateFunction_genericType() throws Exception {
                ...

                .aggregate(
                        new GenericAggregateFunc<>(cfg), 
                        Types.TUPLE(Types.STRING, Types.INT),
                        Types.TUPLE(Types.STRING, Types.INT))
                ...
    }

注意:从 Flink 1.10.1 开始,aggregate 方法使用 @PublicEvolving 注释。

Can you implement Flink's AggregateFunction with Generic Types?

是的。你可以。正如您已经完成的那样。你的错误是你如何使用它的结果(use-site泛型 “) 而不是你如何 实施 它。

...Is there any other solution to this problem?...

我按照简单性

的升序提出以下三个候选方案
...
source
       .keyBy(k -> k.f0)
       .countWindow(5, 1)
       .aggregate(new GenericAggregateFunc< Tuple2<String, Integer>, Tuple2<String, Integer> >(cfg)) /* filling in the diamond will aid type inference */
       .print();
...

以上是最简单的,因为您不必重构原始 GenericAgregateFunc;只需在菱形中填写您想要实例化泛型 class 的特定类型参数。

还有一个稍微不那么简单的解决方案……

public static class GenericAggregateFunc implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {

    private BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String, Integer>> cfg;
    GenericAggregateFunc(BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String, Integer>> cfg) {
        this.cfg = cfg;
    }
    @Override
    public Tuple2<String, Integer> createAccumulator() {
        return cfg.createAcc();
    }
    @Override
    public Tuple2<String, Integer> add(Tuple2<String, Integer> in, Tuple2<String, Integer> acc) {
        return cfg.addAccumulators(in, acc);
    }
    @Override
    public Tuple2<String, Integer> getResult(Tuple2<String, Integer> acc) {
        return acc;
    }
    @Override
    public Tuple2<String, Integer> merge(Tuple2<String, Integer> acc, Tuple2<String, Integer> acc1) {
        return null;
    }
}

虽然这个涉及到一个小的重构,但它比第一个提议的解决方案更能简化你的整个应用程序 — 在我看来

Flink 已经为你处理了“复杂”的通用多态性。要 plug-in 到 Flink, 所要做的就是 简单地 实例化它们的 built-in 泛型 AggregateFunction<IN, ACC, OUT> 使用您想要实例化它的特定类型参数。在您的情况下,这些类型参数的类型为 Tuple2<String, Integer>

所以您仍然对第二种解决方案“使用泛型”,但您这样做的方式要简单得多。

另一个更接近您的原始实现的选项,但有一些小的重构…

public static class GenericAggregateFunc<In, Acc, Out> implements AggregateFunction<In, Acc, Out> {
    
    ...
    @Override
    public Out getResult(Acc acc) {
        return ...;
    }
    ...
}

此外,为了强制用户配置实现与您的功能兼容的接口的前提条件...

public interface BaseConfigAPI< In, Acc, Out >{ ... }

my experiment 中,我已确认将 Out 类型参数添加到 BaseConfigAPI 也使其兼容。

我确实有一个更复杂的替代解决方案。但由于越简单越好,我将把更复杂的解决方案留给其他人提出。