如何在 Beam 中进行周期性工作?
How to do periodic job in Beam?
我想每十分钟做一次周期性工作(cron-job),从 Postgresql
加载数据。 GenerateSequence
是用来实现的,下面是代码。
PostgresOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PostgresOptions.class);
// create pipeline
Pipeline pipeline = Pipeline.create(options);
PGSimpleDataSource pgDataSource = getPostgresDataSource(options);
// run sequence
PCollection<KV<String, Float>> sequence = pipeline.apply(
"Generate Sequence",
GenerateSequence.from(0).withRate(1, Duration.standardMinutes(10))
).apply(
"Read Postgresql",
ParDo.of(new ReadPG(pgDataSource))
);
...
private static class ReadPG extends DoFn<Long, KV<String, Float>> {
private PGSimpleDataSource pgDataSource;
public ReadPG(PGSimpleDataSource source) {
pgDataSource = source;
}
@ProcessElement
public void processElement( @Element Long seq, final OutputReceiver<KV<String, Float>> receiver) {
JdbcIO.<KV<String, Float>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(this.pgDataSource))
.withCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Float.class)))
.withRowMapper(new JdbcIO.RowMapper<KV<String, Float>>() {
public KV<String, Float> mapRow(ResultSet resultSet) throws Exception {
KV<String, Float> kv = KV.of(resultSet.getString("id"), resultSet.getFloat("key1"));
while (resultSet.next()) {
System.out.println(resultSet.getString("id"));
}
receiver.output(kv);
return kv;
}
})
.withQuery("select * from table_name limit 10;");
}
}
但是,没有从 Postgresql
加载数据。我的代码有什么问题吗?
光束版本:2.15.0
任何 Beam PTransform 都必须是管道的一部分,您不能仅在 processElement()
内部使用它,因为它不会被 运行ner 转换为 DAG 的一部分。如果你正在寻找 composite transform 那么它可以通过覆盖其他 PTransform 的 expand()
方法来实现。
同时,Beam 不打算将作业安排到后端处理引擎上的 运行,因此应该在 Beam 之外执行。提到了一些选项 here.
最后,周期性作业通过GenerateSequence
和Window
完成。
这里是示例代码
pipeline.apply(
"Generate Sequence",
GenerateSequence.from(0).withRate(1, Duration.standardMinutes(WindowInterval))
)
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(WindowInterval))))
.apply("Read Data from PG", new ReadPG(pgDataSource))
我想每十分钟做一次周期性工作(cron-job),从 Postgresql
加载数据。 GenerateSequence
是用来实现的,下面是代码。
PostgresOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PostgresOptions.class);
// create pipeline
Pipeline pipeline = Pipeline.create(options);
PGSimpleDataSource pgDataSource = getPostgresDataSource(options);
// run sequence
PCollection<KV<String, Float>> sequence = pipeline.apply(
"Generate Sequence",
GenerateSequence.from(0).withRate(1, Duration.standardMinutes(10))
).apply(
"Read Postgresql",
ParDo.of(new ReadPG(pgDataSource))
);
...
private static class ReadPG extends DoFn<Long, KV<String, Float>> {
private PGSimpleDataSource pgDataSource;
public ReadPG(PGSimpleDataSource source) {
pgDataSource = source;
}
@ProcessElement
public void processElement( @Element Long seq, final OutputReceiver<KV<String, Float>> receiver) {
JdbcIO.<KV<String, Float>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(this.pgDataSource))
.withCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Float.class)))
.withRowMapper(new JdbcIO.RowMapper<KV<String, Float>>() {
public KV<String, Float> mapRow(ResultSet resultSet) throws Exception {
KV<String, Float> kv = KV.of(resultSet.getString("id"), resultSet.getFloat("key1"));
while (resultSet.next()) {
System.out.println(resultSet.getString("id"));
}
receiver.output(kv);
return kv;
}
})
.withQuery("select * from table_name limit 10;");
}
}
但是,没有从 Postgresql
加载数据。我的代码有什么问题吗?
光束版本:2.15.0
任何 Beam PTransform 都必须是管道的一部分,您不能仅在 processElement()
内部使用它,因为它不会被 运行ner 转换为 DAG 的一部分。如果你正在寻找 composite transform 那么它可以通过覆盖其他 PTransform 的 expand()
方法来实现。
同时,Beam 不打算将作业安排到后端处理引擎上的 运行,因此应该在 Beam 之外执行。提到了一些选项 here.
最后,周期性作业通过GenerateSequence
和Window
完成。
这里是示例代码
pipeline.apply(
"Generate Sequence",
GenerateSequence.from(0).withRate(1, Duration.standardMinutes(WindowInterval))
)
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(WindowInterval))))
.apply("Read Data from PG", new ReadPG(pgDataSource))