如何使用 Blink Planner 从缩回流中插入缩回水槽

How to insert into retracting sink from retracting stream with blink planner

我正在尝试使用 Flink Table 并将其转换为可伸缩水槽,然后将其连接到水槽中。我能够在使用 CRow 的原始 table 规划器中执行此操作,但 Flink 的 Blink 规划器似乎不再支持 CRow。有没有办法在使用 Blink 规划器时完成此操作?

作为参考,我们之前可以通过将缩回流映射到 CRow 类型,然后再将其连接到 RetractStreamTableSink.

来做到这一点

下面是我要完成的单元测试示例,请注意注释掉的代码块在旧计划器中可以正常工作。

这失败并出现以下异常,这是有道理的,因为回缩流的类型为 Tuple2<Boolean, Row> 并且 Sink 的类型为 Row,但我看不到使用方法Tuple2RetractStreamTableSink<Row>

缩回 DataStream
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.sink2 do not match.
Query schema: [f0: BOOLEAN, f1: ROW<`f0` STRING, `f1` STRING>]
Sink schema: [f0: STRING, f1: STRING]
    @Test
    public void retractStream() throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, settings);

        Row row1 = new Row(2);
        row1.setField(0, "1");
        row1.setField(1, "2");

        SingleOutputStreamOperator<Row> source =
                executionEnvironment.fromCollection(ImmutableList.of(row1)).setParallelism(1);

        tableEnvironment.createTemporaryView("table1", source, "key, id");
        Table outputTable = tableEnvironment.sqlQuery("select key, id from table1");
        RowTypeInfo rowTypeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

        // This code block below works on Flink planner but fails on Blink planner because Blink treats all non-tuples
        // as POJOs
        // SingleOutputStreamOperator<?> tuple2DataStream = tableEnvironment
        //         .toRetractStream(outputTable, rowTypeInfo)
        //         .map(value -> new CRow(value.f1, value.f0))
        //         .returns(new CRowTypeInfo(rowTypeInfo));

        // Create the retracting stream
        DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(outputTable, rowTypeInfo);
        tableEnvironment.createTemporaryView("outputTable", tuple2DataStream);

        // Create a sink
        TableSchema schema = new TableSchema(rowTypeInfo.getFieldNames(), rowTypeInfo.getFieldTypes());
        CollectingTableSink collectingTableSink = new CollectingTableSink(schema);
        RetractSink retractTableSink = new RetractSink(collectingTableSink);
        tableEnvironment.registerTableSink("sink2", retractTableSink);

        // Wire up the table and the sink (this is what fails)
        tableEnvironment.from("outputTable").insertInto("sink2");
        executionEnvironment.execute();
        System.out.println(collectingTableSink.rows);
    }

所以我找到了解决这个问题的方法,如果你创建一个 shim 接口 AppendStreamTableSink<Tuple2<Boolean, Row>> 并让它实现 RetractStreamTableSink 默认的方法,然后覆盖 consumeDataStream 方法,如图所示下面,您可以从 Tuple2 返回到 Row 而无需 CRow.

这正是 RetractStreamTableSink 的用途,但某些原因导致 Blink 在使用时失败(即使 AppendStreamTableSinkRetractStreamTableSink 相同(所有的方法都被覆盖并且相等,唯一的区别是你实现的接口的名称。我强烈怀疑这是 Blink 规划器中的一个错误,但一直无法确定它来自哪里。

进行转换的代码片段:

@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
        DataStream<Row> filteredAndMapped =
                dataStream.filter(x -> x.f0).map(x -> x.f1).returns(delegate.getOutputType());

        return delegate.consumeDataStream(filteredAndMapped);
}