为 Flink 作业创建可分页 JDBC 源

Create pageable JDBC source for Flink Job

为了处理来自数据库的数据,我使用的是 flink。我用 jdbc 创建了输入。

val inputFormat = JDBCInputFormat.buildJDBCInputFormat()
             .setDrivername(driver)
             .setDBUrl(url)
             .setUsername(username)
             .setPassword(password)
             .setQuery("select id, name from users")
             .finish()

env.createInput(inputFormat)

问题是此输入正在从 table 获取所有数据。由于此 table 包含大量信息,我需要类似可分页的 jdbc 来源。我可以为此使用任何其他设置吗?

您可以将查询拆分为多个部分,通过将查询指定为参数化查询并提供要绑定参数的值来独立执行。

以下摘自JDBCInputFormat的JavaDoc。

 * <p>In order to query the JDBC source in parallel, you need to provide a
 * parameterized query template (i.e. a valid {@link PreparedStatement}) and
 * a {@link ParameterValuesProvider} which provides binding values for the
 * query parameters. E.g.:
 *
 * <pre><code>
 *
 * Serializable[][] queryParameters = new String[2][1];
 * queryParameters[0] = new String[]{"Kumar"};
 * queryParameters[1] = new String[]{"Tan Ah Teck"};
 *
 * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
 *              .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
 *              .setDBUrl("jdbc:derby:memory:ebookshop")
 *              .setQuery("select * from books WHERE author = ?")
 *              .setRowTypeInfo(rowTypeInfo)
 *              .setParametersProvider(new GenericParameterValuesProvider(queryParameters))
 *              .finish();
 * </code></pre>

注意:

  • 查询的 table 应该在参数化属性上有一个适当的索引。否则,您将对 table 进行多次全面扫描,这没有帮助。
  • 参数应该覆盖 table 的所有(必需)数据一次。否则您可能会错过某些行或查询某些行两次。