如何将 Side Inputs/Extra 输入传递给 JdbcIO RowMapper Java
How to pass Side Inputs/Extra Input to JdbcIO RowMapper Java
我正在尝试使用 JdbcIO.Read 在 java 光束中读取云 SQL table。我想使用 .withRowMapper(Resultset resultSet) 方法将 Resultset 中的每一行转换为 GenericData.Record 。有没有一种方法可以将 JSON Schema String 作为 .withRowMapper 方法中的输入传递,例如 ParDo 接受 sideInputs 作为 PCollectionView
我已经尝试执行两个读取操作(在同一 JdbcIO.Read 转换中从 information_schema.columns 和我的 Table 读取)。但是,我想先生成 Schema PCollection,然后使用 JdbcIO.Read
读取 table
我正在像这样即时生成 table 的 Avro 模式:
PCollection<String> avroSchema= pipeline.apply(JdbcIO.<String>read()
.withDataSourceConfiguration(config)
.withCoder(StringUtf8Coder.of())
.withQuery("SELECT DISTINCT column_name, data_type \n" +
"FROM information_schema.columns\n" +
"WHERE table_name = " + "'" + tableName + "'")
.withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
// code here to generate avro schema string
// this works fine for me
}))
正在创建 PCollectionView,它将为每个 table.
保存我的 json 架构
PCollectionView<String> s = avroSchema.apply(View.<String>asSingleton());
// I want to access this view as side input in next JdbcIO.Read operation
// something like this ;
pipeline.apply(JdbcIO.<String>read()
.withDataSourceConfiguration(config)
.withCoder(StringUtf8Coder.of())
.withQuery(queryString)
.withRowMapper(new JdbcIO.RowMapper<String>() {
@Override
public String mapRow(ResultSet resultSet) throws Exception {
// access schema here and use it to parse and create
//GenericData.Record from ResultSet fields as per schema
return null;
}
})).
withSideInputs(My PCollectionView here); // this option is not there right now.
有没有更好的方法来解决这个问题?
此时IOsAPI不接受SideInputs。
读取后立即添加ParDo并在那里进行映射应该是可行的。 ParDo 可以接受边输入。
我正在尝试使用 JdbcIO.Read 在 java 光束中读取云 SQL table。我想使用 .withRowMapper(Resultset resultSet) 方法将 Resultset 中的每一行转换为 GenericData.Record 。有没有一种方法可以将 JSON Schema String 作为 .withRowMapper 方法中的输入传递,例如 ParDo 接受 sideInputs 作为 PCollectionView
我已经尝试执行两个读取操作(在同一 JdbcIO.Read 转换中从 information_schema.columns 和我的 Table 读取)。但是,我想先生成 Schema PCollection,然后使用 JdbcIO.Read
读取 table我正在像这样即时生成 table 的 Avro 模式:
PCollection<String> avroSchema= pipeline.apply(JdbcIO.<String>read()
.withDataSourceConfiguration(config)
.withCoder(StringUtf8Coder.of())
.withQuery("SELECT DISTINCT column_name, data_type \n" +
"FROM information_schema.columns\n" +
"WHERE table_name = " + "'" + tableName + "'")
.withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
// code here to generate avro schema string
// this works fine for me
}))
正在创建 PCollectionView,它将为每个 table.
保存我的 json 架构 PCollectionView<String> s = avroSchema.apply(View.<String>asSingleton());
// I want to access this view as side input in next JdbcIO.Read operation
// something like this ;
pipeline.apply(JdbcIO.<String>read()
.withDataSourceConfiguration(config)
.withCoder(StringUtf8Coder.of())
.withQuery(queryString)
.withRowMapper(new JdbcIO.RowMapper<String>() {
@Override
public String mapRow(ResultSet resultSet) throws Exception {
// access schema here and use it to parse and create
//GenericData.Record from ResultSet fields as per schema
return null;
}
})).
withSideInputs(My PCollectionView here); // this option is not there right now.
有没有更好的方法来解决这个问题?
此时IOsAPI不接受SideInputs。
读取后立即添加ParDo并在那里进行映射应该是可行的。 ParDo 可以接受边输入。