Flink SQL :使用变更日志流更新动态中的行 Table
Flink SQL : Use changelog stream to update rows in Dynamic Table
我有一个包含 JSON 消息的流,如下所示:
{"operation":"CREATE","data":{"id":"id-1", "value":"value-1"}}
{"operation":"CREATE","data":{"id":"id-2", "value":"value-2"}}
{"operation":"DELETE","data":{"id":"id-1"}}
{"operation":"UPDATE","data":{"id":"id-2", "value":"value-3"}}
此流在注册为 TableSource
的 DataStream<Row>
中处理。
我想将此流用作 changelog 流 来更新 Flink Table 的内容,但我找不到这样做的方法。
我将 StreamTableSource
定义为:
public class MyTableSource implements StreamTableSource<Row>, ... {
@Override
public DataStream<Row> getDataStream(final StreamExecutionEnvironment env) {
DataStream<Row> stream = getDataStream(env) // Retrieve changelog stream
.keyBy([SOME KEY]) // Aggregate by key
.map(new MyMapFunction()); // Map the update message with the correct encoding ?
return stream;
}
...
}
而这个TableSource
用于
public void process(final StreamExecutionEnvironment env) {
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.registerTableSource("MyTableSource", new MyTableSource());
Table result = tableEnv.sqlQuery("SELECT * FROM MyTableSource"); // This table content should be updated according to operation described in the changelog stream.
result.insertInto([SOME SINK]);
}
这样做的好方法是什么? (更具体地说,我如何使用流从 Table 中删除行?)
目前,内部变更日志处理功能未通过 API 公开。因此,没有可用的来源允许您将传入的变更日志解释为 table。这是为 Flink 1.11 计划的。
在那之前,您可以考虑使用用户定义的聚合函数来应用此处建议的更新:
我有一个包含 JSON 消息的流,如下所示:
{"operation":"CREATE","data":{"id":"id-1", "value":"value-1"}}
{"operation":"CREATE","data":{"id":"id-2", "value":"value-2"}}
{"operation":"DELETE","data":{"id":"id-1"}}
{"operation":"UPDATE","data":{"id":"id-2", "value":"value-3"}}
此流在注册为 TableSource
的 DataStream<Row>
中处理。
我想将此流用作 changelog 流 来更新 Flink Table 的内容,但我找不到这样做的方法。
我将 StreamTableSource
定义为:
public class MyTableSource implements StreamTableSource<Row>, ... {
@Override
public DataStream<Row> getDataStream(final StreamExecutionEnvironment env) {
DataStream<Row> stream = getDataStream(env) // Retrieve changelog stream
.keyBy([SOME KEY]) // Aggregate by key
.map(new MyMapFunction()); // Map the update message with the correct encoding ?
return stream;
}
...
}
而这个TableSource
用于
public void process(final StreamExecutionEnvironment env) {
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.registerTableSource("MyTableSource", new MyTableSource());
Table result = tableEnv.sqlQuery("SELECT * FROM MyTableSource"); // This table content should be updated according to operation described in the changelog stream.
result.insertInto([SOME SINK]);
}
这样做的好方法是什么? (更具体地说,我如何使用流从 Table 中删除行?)
目前,内部变更日志处理功能未通过 API 公开。因此,没有可用的来源允许您将传入的变更日志解释为 table。这是为 Flink 1.11 计划的。
在那之前,您可以考虑使用用户定义的聚合函数来应用此处建议的更新: