如何使用 Blink Planner 从缩回流中插入缩回水槽
How to insert into retracting sink from retracting stream with blink planner
我正在尝试使用 Flink Table 并将其转换为可伸缩水槽,然后将其连接到水槽中。我能够在使用 CRow
的原始 table 规划器中执行此操作,但 Flink 的 Blink 规划器似乎不再支持 CRow
。有没有办法在使用 Blink 规划器时完成此操作?
作为参考,我们之前可以通过将缩回流映射到 CRow
类型,然后再将其连接到 RetractStreamTableSink
.
来做到这一点
下面是我要完成的单元测试示例,请注意注释掉的代码块在旧计划器中可以正常工作。
这失败并出现以下异常,这是有道理的,因为回缩流的类型为 Tuple2<Boolean, Row>
并且 Sink 的类型为 Row
,但我看不到使用方法Tuple2
用 RetractStreamTableSink<Row>
缩回 DataStream
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.sink2 do not match.
Query schema: [f0: BOOLEAN, f1: ROW<`f0` STRING, `f1` STRING>]
Sink schema: [f0: STRING, f1: STRING]
@Test
public void retractStream() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, settings);
Row row1 = new Row(2);
row1.setField(0, "1");
row1.setField(1, "2");
SingleOutputStreamOperator<Row> source =
executionEnvironment.fromCollection(ImmutableList.of(row1)).setParallelism(1);
tableEnvironment.createTemporaryView("table1", source, "key, id");
Table outputTable = tableEnvironment.sqlQuery("select key, id from table1");
RowTypeInfo rowTypeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
// This code block below works on Flink planner but fails on Blink planner because Blink treats all non-tuples
// as POJOs
// SingleOutputStreamOperator<?> tuple2DataStream = tableEnvironment
// .toRetractStream(outputTable, rowTypeInfo)
// .map(value -> new CRow(value.f1, value.f0))
// .returns(new CRowTypeInfo(rowTypeInfo));
// Create the retracting stream
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(outputTable, rowTypeInfo);
tableEnvironment.createTemporaryView("outputTable", tuple2DataStream);
// Create a sink
TableSchema schema = new TableSchema(rowTypeInfo.getFieldNames(), rowTypeInfo.getFieldTypes());
CollectingTableSink collectingTableSink = new CollectingTableSink(schema);
RetractSink retractTableSink = new RetractSink(collectingTableSink);
tableEnvironment.registerTableSink("sink2", retractTableSink);
// Wire up the table and the sink (this is what fails)
tableEnvironment.from("outputTable").insertInto("sink2");
executionEnvironment.execute();
System.out.println(collectingTableSink.rows);
}
所以我找到了解决这个问题的方法,如果你创建一个 shim 接口 AppendStreamTableSink<Tuple2<Boolean, Row>>
并让它实现 RetractStreamTableSink
默认的方法,然后覆盖 consumeDataStream
方法,如图所示下面,您可以从 Tuple2 返回到 Row 而无需 CRow
.
这正是 RetractStreamTableSink
的用途,但某些原因导致 Blink 在使用时失败(即使 AppendStreamTableSink
和 RetractStreamTableSink
相同(所有的方法都被覆盖并且相等,唯一的区别是你实现的接口的名称。我强烈怀疑这是 Blink 规划器中的一个错误,但一直无法确定它来自哪里。
进行转换的代码片段:
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
DataStream<Row> filteredAndMapped =
dataStream.filter(x -> x.f0).map(x -> x.f1).returns(delegate.getOutputType());
return delegate.consumeDataStream(filteredAndMapped);
}
我正在尝试使用 Flink Table 并将其转换为可伸缩水槽,然后将其连接到水槽中。我能够在使用 CRow
的原始 table 规划器中执行此操作,但 Flink 的 Blink 规划器似乎不再支持 CRow
。有没有办法在使用 Blink 规划器时完成此操作?
作为参考,我们之前可以通过将缩回流映射到 CRow
类型,然后再将其连接到 RetractStreamTableSink
.
下面是我要完成的单元测试示例,请注意注释掉的代码块在旧计划器中可以正常工作。
这失败并出现以下异常,这是有道理的,因为回缩流的类型为 Tuple2<Boolean, Row>
并且 Sink 的类型为 Row
,但我看不到使用方法Tuple2
用 RetractStreamTableSink<Row>
DataStream
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.sink2 do not match.
Query schema: [f0: BOOLEAN, f1: ROW<`f0` STRING, `f1` STRING>]
Sink schema: [f0: STRING, f1: STRING]
@Test
public void retractStream() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, settings);
Row row1 = new Row(2);
row1.setField(0, "1");
row1.setField(1, "2");
SingleOutputStreamOperator<Row> source =
executionEnvironment.fromCollection(ImmutableList.of(row1)).setParallelism(1);
tableEnvironment.createTemporaryView("table1", source, "key, id");
Table outputTable = tableEnvironment.sqlQuery("select key, id from table1");
RowTypeInfo rowTypeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
// This code block below works on Flink planner but fails on Blink planner because Blink treats all non-tuples
// as POJOs
// SingleOutputStreamOperator<?> tuple2DataStream = tableEnvironment
// .toRetractStream(outputTable, rowTypeInfo)
// .map(value -> new CRow(value.f1, value.f0))
// .returns(new CRowTypeInfo(rowTypeInfo));
// Create the retracting stream
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(outputTable, rowTypeInfo);
tableEnvironment.createTemporaryView("outputTable", tuple2DataStream);
// Create a sink
TableSchema schema = new TableSchema(rowTypeInfo.getFieldNames(), rowTypeInfo.getFieldTypes());
CollectingTableSink collectingTableSink = new CollectingTableSink(schema);
RetractSink retractTableSink = new RetractSink(collectingTableSink);
tableEnvironment.registerTableSink("sink2", retractTableSink);
// Wire up the table and the sink (this is what fails)
tableEnvironment.from("outputTable").insertInto("sink2");
executionEnvironment.execute();
System.out.println(collectingTableSink.rows);
}
所以我找到了解决这个问题的方法,如果你创建一个 shim 接口 AppendStreamTableSink<Tuple2<Boolean, Row>>
并让它实现 RetractStreamTableSink
默认的方法,然后覆盖 consumeDataStream
方法,如图所示下面,您可以从 Tuple2 返回到 Row 而无需 CRow
.
这正是 RetractStreamTableSink
的用途,但某些原因导致 Blink 在使用时失败(即使 AppendStreamTableSink
和 RetractStreamTableSink
相同(所有的方法都被覆盖并且相等,唯一的区别是你实现的接口的名称。我强烈怀疑这是 Blink 规划器中的一个错误,但一直无法确定它来自哪里。
进行转换的代码片段:
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
DataStream<Row> filteredAndMapped =
dataStream.filter(x -> x.f0).map(x -> x.f1).returns(delegate.getOutputType());
return delegate.consumeDataStream(filteredAndMapped);
}