如何在 Beam 中进行周期性工作?

How to do periodic job in Beam?

我想每十分钟做一次周期性工作(cron-job),从 Postgresql 加载数据。 GenerateSequence是用来实现的,下面是代码。

    PostgresOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PostgresOptions.class);

    // create pipeline
    Pipeline pipeline = Pipeline.create(options);
    PGSimpleDataSource pgDataSource = getPostgresDataSource(options);

    // run sequence
    PCollection<KV<String, Float>> sequence = pipeline.apply(
            "Generate Sequence",
            GenerateSequence.from(0).withRate(1, Duration.standardMinutes(10))
    ).apply(
        "Read Postgresql",
            ParDo.of(new ReadPG(pgDataSource))
    );

...

    private static class ReadPG extends DoFn<Long, KV<String, Float>> {
        private PGSimpleDataSource pgDataSource;
        public ReadPG(PGSimpleDataSource source) {
            pgDataSource = source;
        }

        @ProcessElement
        public void processElement( @Element Long seq, final OutputReceiver<KV<String, Float>> receiver) {
            JdbcIO.<KV<String, Float>>read()
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(this.pgDataSource))
                .withCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Float.class)))
                .withRowMapper(new JdbcIO.RowMapper<KV<String, Float>>() {
                    public KV<String, Float> mapRow(ResultSet resultSet) throws Exception {
                        KV<String, Float> kv = KV.of(resultSet.getString("id"), resultSet.getFloat("key1"));
                        while (resultSet.next()) {
                            System.out.println(resultSet.getString("id"));
                        }
                        receiver.output(kv);

                        return kv;
                    }
                })
                .withQuery("select * from table_name limit 10;");
        }
    }

但是,没有从 Postgresql 加载数据。我的代码有什么问题吗?

光束版本:2.15.0

任何 Beam PTransform 都必须是管道的一部分,您不能仅在 processElement() 内部使用它,因为它不会被 运行ner 转换为 DAG 的一部分。如果你正在寻找 composite transform 那么它可以通过覆盖其他 PTransform 的 expand() 方法来实现。

同时,Beam 不打算将作业安排到后端处理引擎上的 运行,因此应该在 Beam 之外执行。提到了一些选项 here.

最后,周期性作业通过GenerateSequenceWindow完成。

这里是示例代码

        pipeline.apply(
                "Generate Sequence",
                GenerateSequence.from(0).withRate(1, Duration.standardMinutes(WindowInterval))
        )
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(WindowInterval))))
        .apply("Read Data from PG", new ReadPG(pgDataSource))