Apache Flink:如何使用 Table API 查询关系数据库?

Apache Flink: How to query a relational database with the Table API?

以下代码片段取自此blog post

val sensorTable = ??? // can be a CSV file, Kafka topic, database, or ...

// register the table source
tEnv.registerTableSource("sensors", sensorTable)

我想从关系数据库中读取数据。 Flink 有 TableSource 用于 JDBC 数据库吗?

在当前版本(1.4.0,2017 年 12 月)中,Flink 不提供内置的 TableSource 从关系数据库中提取数据。

不过,有一个JDBCInputFormat可以用。您可以

  • 使用它从使用 DataSet API 和 register the DataSet as a Table
  • 的数据库中读取数据
  • implement a JdbcTableSource 包裹 JdbcInputFormatJdbcTableSource 的简单实现应该很容易实现。实现对并行读取、投影或过滤器下推的支持需要更多的努力。