Flink:数据流到 Table
Flink : DataStream to Table
Usecase:从 Kafka 读取 protobuf 消息,反序列化它们,应用一些转换(展平一些列),然后写入 dynamodb。
遗憾的是,Kafka Flink Connector 仅支持 - csv、json 和 avro 格式。所以,我不得不使用较低级别的 API(数据流)。
问题: 如果我可以从数据流对象中创建一个 table,那么我可以在 [=26] 上接受对 运行 的查询=].它将使转换部分无缝且通用。 是否可以运行 SQL 查询数据流对象?
如果您有 DataStream
个对象,那么您只需使用 StreamTableEnvironment
.
将给定的 DataStream
注册为 Table
这看起来或多或少像下面这样:
val myStream = ...
val env: StreamExecutionEnvironment = configureFlinkEnv(StreamExecutionEnvironment.getExecutionEnvironment)
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
tEnv.registerDataStream("myTable", myStream, [Field expressions])
然后您应该能够查询从您的 DataStream 创建的动态 table。
Usecase:从 Kafka 读取 protobuf 消息,反序列化它们,应用一些转换(展平一些列),然后写入 dynamodb。
遗憾的是,Kafka Flink Connector 仅支持 - csv、json 和 avro 格式。所以,我不得不使用较低级别的 API(数据流)。
问题: 如果我可以从数据流对象中创建一个 table,那么我可以在 [=26] 上接受对 运行 的查询=].它将使转换部分无缝且通用。 是否可以运行 SQL 查询数据流对象?
如果您有 DataStream
个对象,那么您只需使用 StreamTableEnvironment
.
DataStream
注册为 Table
这看起来或多或少像下面这样:
val myStream = ...
val env: StreamExecutionEnvironment = configureFlinkEnv(StreamExecutionEnvironment.getExecutionEnvironment)
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
tEnv.registerDataStream("myTable", myStream, [Field expressions])
然后您应该能够查询从您的 DataStream 创建的动态 table。