有没有办法使用 apache beam 管道进行动态查询或执行多个查询?

Is there a way to have a dynamic query or execute multiple queries with an apache beam pipeline?

我正在使用 apache beam 和 google 云数据流将信息插入云 SQL 数据库。到目前为止,这对一个 table 来说一直很有效。正在发送的信息正在扩展,包括发往数据库中另一个 table 的信息。

我很好奇是否有一种方法可以根据我收到的信息动态使用 SQL 查询,或者我是否能够以某种方式创建管道来执行多个查询?两者都行...

或者,我是否坚持必须创建一个单独的管道?

干杯,

编辑:添加我当前的管道配置

MainPipeline = Pipeline.create(options);

    MainPipeline.apply(PubsubIO.readStrings().fromSubscription(MAIN_SUBSCRIPTION))
    .apply(JdbcIO.<String> write()
        .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.cj.jdbc.Driver", JDBC_URL)
            .withUsername(JDBC_USER).withPassword(JDBC_PASS))
        .withStatement(QUERY_SQL).withPreparedStatementSetter(new NewPreparedStatementSetter() {
        }));

我认为您不能根据输入元素对 JdbcIO 进行动态查询,据我所知,它在构造时配置一次。

但是,如果它们适合您的用例,我可以想到几个潜在的解决方法。

一种是编写您自己的 ParDo,您可以在其中手动调用 JDBC 驱动程序。这将基本上重新实现 JdbcIO 的某些部分并添加新功能。这样ParDo可以随心所欲

另一种是将输入PColleciton拆分为multiple outputs。如果您的用例仅限于您可以根据输入从中选择的一些预定义查询集,那么这将起作用。通过这种方式,您可以将输入拆分为多个 PCollections,然后将不同配置的 IOs 附加到每个