Flink:数据流到 Table

Flink : DataStream to Table

Usecase:从 Kafka 读取 protobuf 消息,反序列化它们,应用一些转换(展平一些列),然后写入 dynamodb。

遗憾的是,Kafka Flink Connector 仅支持 - csv、json 和 avro 格式。所以,我不得不使用较低级别的 API(数据流)。

问题: 如果我可以从数据流对象中创建一个 table,那么我可以在 [=26] 上接受对 运行 的查询=].它将使转换部分无缝且通用。 是否可以运行 SQL 查询数据流对象?

如果您有 DataStream 个对象,那么您只需使用 StreamTableEnvironment.

将给定的 DataStream 注册为 Table

这看起来或多或少像下面这样:

val myStream = ...
val env: StreamExecutionEnvironment = configureFlinkEnv(StreamExecutionEnvironment.getExecutionEnvironment)
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
tEnv.registerDataStream("myTable", myStream, [Field expressions])

然后您应该能够查询从您的 DataStream 创建的动态 table。