如何将 Side Inputs/Extra 输入传递给 JdbcIO RowMapper Java

How to pass Side Inputs/Extra Input to JdbcIO RowMapper Java

我正在尝试使用 JdbcIO.Read 在 java 光束中读取云 SQL table。我想使用 .withRowMapper(Resultset resultSet) 方法将 Resultset 中的每一行转换为 GenericData.Record 。有没有一种方法可以将 JSON Schema String 作为 .withRowMapper 方法中的输入传递,例如 ParDo 接受 sideInputs 作为 PCollectionView

我已经尝试执行两个读取操作(在同一 JdbcIO.Read 转换中从 information_schema.columns 和我的 Table 读取)。但是,我想先生成 Schema PCollection,然后使用 JdbcIO.Read

读取 table

我正在像这样即时生成 table 的 Avro 模式:

PCollection<String> avroSchema= pipeline.apply(JdbcIO.<String>read()
                .withDataSourceConfiguration(config)
                .withCoder(StringUtf8Coder.of())
                .withQuery("SELECT DISTINCT column_name, data_type \n" +
                        "FROM information_schema.columns\n" +
                        "WHERE table_name = " + "'" + tableName + "'")
                .withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
            // code here to generate avro schema string
           // this works fine for me

}))

正在创建 PCollectionView,它将为每个 table.

保存我的 json 架构
 PCollectionView<String> s = avroSchema.apply(View.<String>asSingleton());

// I want to access this view as side input in next JdbcIO.Read operation
// something like this ;

pipeline.apply(JdbcIO.<String>read()
        .withDataSourceConfiguration(config)
        .withCoder(StringUtf8Coder.of())
        .withQuery(queryString)
        .withRowMapper(new JdbcIO.RowMapper<String>() {

            @Override
            public String mapRow(ResultSet resultSet) throws Exception {
                // access schema here and use it to parse and create 
               //GenericData.Record from ResultSet fields as per schema

                return null;
            }
        })).

    withSideInputs(My PCollectionView here); // this option is not there right now.

有没有更好的方法来解决这个问题?

此时IOsAPI不接受SideInputs。

读取后立即添加ParDo并在那里进行映射应该是可行的。 ParDo 可以接受边输入。