有没有办法使用 apache beam 管道进行动态查询或执行多个查询?
Is there a way to have a dynamic query or execute multiple queries with an apache beam pipeline?
我正在使用 apache beam 和 google 云数据流将信息插入云 SQL 数据库。到目前为止,这对一个 table 来说一直很有效。正在发送的信息正在扩展,包括发往数据库中另一个 table 的信息。
我很好奇是否有一种方法可以根据我收到的信息动态使用 SQL 查询,或者我是否能够以某种方式创建管道来执行多个查询?两者都行...
或者,我是否坚持必须创建一个单独的管道?
干杯,
编辑:添加我当前的管道配置
MainPipeline = Pipeline.create(options);
MainPipeline.apply(PubsubIO.readStrings().fromSubscription(MAIN_SUBSCRIPTION))
.apply(JdbcIO.<String> write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.cj.jdbc.Driver", JDBC_URL)
.withUsername(JDBC_USER).withPassword(JDBC_PASS))
.withStatement(QUERY_SQL).withPreparedStatementSetter(new NewPreparedStatementSetter() {
}));
我认为您不能根据输入元素对 JdbcIO
进行动态查询,据我所知,它在构造时配置一次。
但是,如果它们适合您的用例,我可以想到几个潜在的解决方法。
一种是编写您自己的 ParDo
,您可以在其中手动调用 JDBC 驱动程序。这将基本上重新实现 JdbcIO
的某些部分并添加新功能。这样ParDo
可以随心所欲
另一种是将输入PColleciton
拆分为multiple outputs。如果您的用例仅限于您可以根据输入从中选择的一些预定义查询集,那么这将起作用。通过这种方式,您可以将输入拆分为多个 PCollections
,然后将不同配置的 IOs 附加到每个
我正在使用 apache beam 和 google 云数据流将信息插入云 SQL 数据库。到目前为止,这对一个 table 来说一直很有效。正在发送的信息正在扩展,包括发往数据库中另一个 table 的信息。
我很好奇是否有一种方法可以根据我收到的信息动态使用 SQL 查询,或者我是否能够以某种方式创建管道来执行多个查询?两者都行...
或者,我是否坚持必须创建一个单独的管道?
干杯,
编辑:添加我当前的管道配置
MainPipeline = Pipeline.create(options);
MainPipeline.apply(PubsubIO.readStrings().fromSubscription(MAIN_SUBSCRIPTION))
.apply(JdbcIO.<String> write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.cj.jdbc.Driver", JDBC_URL)
.withUsername(JDBC_USER).withPassword(JDBC_PASS))
.withStatement(QUERY_SQL).withPreparedStatementSetter(new NewPreparedStatementSetter() {
}));
我认为您不能根据输入元素对 JdbcIO
进行动态查询,据我所知,它在构造时配置一次。
但是,如果它们适合您的用例,我可以想到几个潜在的解决方法。
一种是编写您自己的 ParDo
,您可以在其中手动调用 JDBC 驱动程序。这将基本上重新实现 JdbcIO
的某些部分并添加新功能。这样ParDo
可以随心所欲
另一种是将输入PColleciton
拆分为multiple outputs。如果您的用例仅限于您可以根据输入从中选择的一些预定义查询集,那么这将起作用。通过这种方式,您可以将输入拆分为多个 PCollections
,然后将不同配置的 IOs 附加到每个