使用数据流 JdbcIO api 写入云 sql
Write into cloud sql using dataflow JdbcIO api
我有一个要求,我必须使用云数据流 API 将 PCollection 的字符串写入云 SQL。
pipeline.apply(TextIO.read().from("gs://***/sampleBigtable.csv"))
.apply(JdbcIO.write()
.withDataSourceConfiguration(DataSourceConfiguration
.create("org.postgresql.Driver", "jdbc:postgresql://***:5432/test")
.withUsername("**").withPassword("password10"))
.withStatement("insert into person values(?,?)")
.withPreparedStatementSetter(
new JdbcIO.PreparedStatementSetter < Object > () {
/**
*
*/
private static final long serialVersionUID = 1 L;
@Override
public void setParameters(Object arg0, PreparedStatement query)
throws Exception {
// TODO Auto-generated method stub
query.setString(1, "Hello");
query.setString(1, "Hi");
}
}));
这是我正在尝试的示例代码。我想做的事情的一个非常简单的版本。
此外,使用 parDo 并编写简单的插入语句从 Dataflow 写入 Cloud SQL 是否可行?
之前的转换输出 PCollection<String>
,因此您需要指定它是 JdbcIO<T>.write()
的输入类型
像这样:
pipeline
.apply(TextIO.read().from("gs://***/sampleBigtable.csv"))
.apply(JdbcIO.<String>write().withDataSourceConfiguration(
DataSourceConfiguration.create("org.postgresql.Driver","jdbc:postgresql://***:5432/test")
.withUsername("**")
.withPassword("password10"))
.withStatement("insert into person values(?,?)")
.withPreparedStatementSetter((element, query) -> {
query.setInt(1, 1);
query.setString(2, "Hello");
})
);
我有一个要求,我必须使用云数据流 API 将 PCollection 的字符串写入云 SQL。
pipeline.apply(TextIO.read().from("gs://***/sampleBigtable.csv"))
.apply(JdbcIO.write()
.withDataSourceConfiguration(DataSourceConfiguration
.create("org.postgresql.Driver", "jdbc:postgresql://***:5432/test")
.withUsername("**").withPassword("password10"))
.withStatement("insert into person values(?,?)")
.withPreparedStatementSetter(
new JdbcIO.PreparedStatementSetter < Object > () {
/**
*
*/
private static final long serialVersionUID = 1 L;
@Override
public void setParameters(Object arg0, PreparedStatement query)
throws Exception {
// TODO Auto-generated method stub
query.setString(1, "Hello");
query.setString(1, "Hi");
}
}));
这是我正在尝试的示例代码。我想做的事情的一个非常简单的版本。
此外,使用 parDo 并编写简单的插入语句从 Dataflow 写入 Cloud SQL 是否可行?
之前的转换输出 PCollection<String>
,因此您需要指定它是 JdbcIO<T>.write()
像这样:
pipeline
.apply(TextIO.read().from("gs://***/sampleBigtable.csv"))
.apply(JdbcIO.<String>write().withDataSourceConfiguration(
DataSourceConfiguration.create("org.postgresql.Driver","jdbc:postgresql://***:5432/test")
.withUsername("**")
.withPassword("password10"))
.withStatement("insert into person values(?,?)")
.withPreparedStatementSetter((element, query) -> {
query.setInt(1, 1);
query.setString(2, "Hello");
})
);