SingleOutputStreamOperator#returns(TypeHint<T> typeHint) 方法的javadoc

javadoc of SingleOutputStreamOperator#returns(TypeHint<T> typeHint) method

我正在阅读SingleOutputStreamOperator#returns的源代码,它的javadoc是:

/**
 * Adds a type information hint about the return type of this operator. This method
 * can be used in cases where Flink cannot determine automatically what the produced
 * type of a function is. That can be the case if the function uses generic type variables
 * in the return type that cannot be inferred from the input type.
 *
 * <p>Use this method the following way:
 * <pre>{@code
 *     DataStream<Tuple2<String, Double>> result =
 *         stream.flatMap(new FunctionWithNonInferrableReturnType())
 *               .returns(new TypeHint<Tuple2<String, Double>>(){});
 * }</pre>
 *
 * @param typeHint The type hint for the returned data type.
 * @return This operator with the type information corresponding to the given type hint.
 */

它提到FunctionWithNonInferrableReturnType来说明returns方法的必要性,但我无法写出这样的class即NonInferrableReturnType。能帮忙写个简单的吗?谢谢!

当文档说 NonInferrableReturnType 时,这意味着我们可以使用类型变量 <T> 或您喜欢的任何其他字母。所以你可以创建一个MapFunction,return一个T。但是你必须使用 .returns(TypeInformation.of(String.class) 例如,如果你的目标是 return a String.

public class MyMapFunctionNonInferrableReturnType<T> implements MapFunction<AbstractDataModel, T> {
    @Override
    public T map(AbstractDataModel value) throws Exception {
        return (T) value.getValue();
    }
}

这里我使用的是你上一个问题 的 类 。没有 .returns(TypeInformation.of(String.class)) 的相同代码编译但抛出运行时异常:

could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

public class NonInferrableReturnTypeStreamJob {

    private final List<AbstractDataModel> abstractDataModelList;
    private final ValenciaSinkFunction sink;

    public NonInferrableReturnTypeStreamJob() {
        this.abstractDataModelList = new ArrayList<AbstractDataModel>();
        this.abstractDataModelList.add(new ConcreteModel("a", "1"));
        this.abstractDataModelList.add(new ConcreteModel("a", "2"));
        this.sink = new ValenciaSinkFunction();
    }

    public NonInferrableReturnTypeStreamJob(List<AbstractDataModel> abstractDataModelList, ValenciaSinkFunction sink) {
        this.abstractDataModelList = abstractDataModelList;
        this.sink = sink;
    }

    public static void main(String[] args) throws Exception {
        NonInferrableReturnTypeStreamJob concreteModelTest = new NonInferrableReturnTypeStreamJob();
        concreteModelTest.execute();
    }

    public void execute() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.fromCollection(this.abstractDataModelList)
                .map(new MyMapFunctionNonInferrableReturnType())
                .returns(TypeInformation.of(String.class))
                .addSink(sink);

        env.execute();
    }
}

如果你愿意,这里是这个例子的集成测试:

import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.ClassRule;
import org.junit.Test;
import org.sense.flink.examples.stream.valencia.ValenciaSinkFunction;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertTrue;

public class NonInferrableReturnTypeStreamJobTest {

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster;
    private final int minAvailableProcessors = 4;
    private final boolean runInParallel;

    public NonInferrableReturnTypeStreamJobTest() {
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        this.runInParallel = availableProcessors >= minAvailableProcessors;
        if (this.runInParallel) {
            flinkCluster = new MiniClusterWithClientResource(
                    new MiniClusterResourceConfiguration.Builder()
                            .setNumberSlotsPerTaskManager(minAvailableProcessors)
                            .setNumberTaskManagers(1)
                            .build());
        }
    }

    @Test
    public void execute() throws Exception {
        List<AbstractDataModel> abstractDataModelList = new ArrayList<AbstractDataModel>();
        abstractDataModelList.add(new ConcreteModel("a", "1"));
        abstractDataModelList.add(new ConcreteModel("a", "2"));
        ValenciaSinkFunction.values.clear();

        NonInferrableReturnTypeStreamJob streamJob = new NonInferrableReturnTypeStreamJob(abstractDataModelList, new ValenciaSinkFunction());
        streamJob.execute();

        List<String> results = ValenciaSinkFunction.values;
        assertEquals(2, results.size());
        assertTrue(results.containsAll(Arrays.asList("1", "2")));
    }
}