为 Flink 作业创建可分页 JDBC 源
Create pageable JDBC source for Flink Job
为了处理来自数据库的数据,我使用的是 flink。我用 jdbc 创建了输入。
val inputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(driver)
.setDBUrl(url)
.setUsername(username)
.setPassword(password)
.setQuery("select id, name from users")
.finish()
env.createInput(inputFormat)
问题是此输入正在从 table 获取所有数据。由于此 table 包含大量信息,我需要类似可分页的 jdbc 来源。我可以为此使用任何其他设置吗?
您可以将查询拆分为多个部分,通过将查询指定为参数化查询并提供要绑定参数的值来独立执行。
以下摘自JDBCInputFormat
的JavaDoc。
* <p>In order to query the JDBC source in parallel, you need to provide a
* parameterized query template (i.e. a valid {@link PreparedStatement}) and
* a {@link ParameterValuesProvider} which provides binding values for the
* query parameters. E.g.:
*
* <pre><code>
*
* Serializable[][] queryParameters = new String[2][1];
* queryParameters[0] = new String[]{"Kumar"};
* queryParameters[1] = new String[]{"Tan Ah Teck"};
*
* JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
* .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
* .setDBUrl("jdbc:derby:memory:ebookshop")
* .setQuery("select * from books WHERE author = ?")
* .setRowTypeInfo(rowTypeInfo)
* .setParametersProvider(new GenericParameterValuesProvider(queryParameters))
* .finish();
* </code></pre>
注意:
- 查询的 table 应该在参数化属性上有一个适当的索引。否则,您将对 table 进行多次全面扫描,这没有帮助。
- 参数应该覆盖 table 的所有(必需)数据一次。否则您可能会错过某些行或查询某些行两次。
为了处理来自数据库的数据,我使用的是 flink。我用 jdbc 创建了输入。
val inputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(driver)
.setDBUrl(url)
.setUsername(username)
.setPassword(password)
.setQuery("select id, name from users")
.finish()
env.createInput(inputFormat)
问题是此输入正在从 table 获取所有数据。由于此 table 包含大量信息,我需要类似可分页的 jdbc 来源。我可以为此使用任何其他设置吗?
您可以将查询拆分为多个部分,通过将查询指定为参数化查询并提供要绑定参数的值来独立执行。
以下摘自JDBCInputFormat
的JavaDoc。
* <p>In order to query the JDBC source in parallel, you need to provide a
* parameterized query template (i.e. a valid {@link PreparedStatement}) and
* a {@link ParameterValuesProvider} which provides binding values for the
* query parameters. E.g.:
*
* <pre><code>
*
* Serializable[][] queryParameters = new String[2][1];
* queryParameters[0] = new String[]{"Kumar"};
* queryParameters[1] = new String[]{"Tan Ah Teck"};
*
* JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
* .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
* .setDBUrl("jdbc:derby:memory:ebookshop")
* .setQuery("select * from books WHERE author = ?")
* .setRowTypeInfo(rowTypeInfo)
* .setParametersProvider(new GenericParameterValuesProvider(queryParameters))
* .finish();
* </code></pre>
注意:
- 查询的 table 应该在参数化属性上有一个适当的索引。否则,您将对 table 进行多次全面扫描,这没有帮助。
- 参数应该覆盖 table 的所有(必需)数据一次。否则您可能会错过某些行或查询某些行两次。