Flink StreamingFileSink 没有将数据写入 AWS S3

Flink StreamingFileSink not writing data to AWS S3

我有一个代表数据流的集合并测试 StreamingFileSink 以将流写入 S3。编程 运行 成功,但给定的 S3 路径中没有数据。

    public class S3Sink {

    public static void main(String args[]) throws Exception {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.enableCheckpointing(100);

        List<String> input = new ArrayList<>();
        input.add("test");

        DataStream<String> inputStream = see.fromCollection(input);

        RollingPolicy<Object, String> rollingPolicy = new CustomRollingPolicy();

        StreamingFileSink s3Sink = StreamingFileSink.
                forRowFormat(new Path("<S3 Path>"),
                new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(rollingPolicy)
                .build();


        inputStream.addSink(s3Sink);

        see.execute();
    }
}

检查点也已启用。关于为什么 Sink 没有按预期工作有什么想法吗?

更新: 根据 David 的回答,创建了连续生成随机字符串的自定义源,我希望检查点在配置的时间间隔后触发以将数据写入 S3。

public class S3SinkCustom {

    public static void main(String args[]) throws Exception {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.enableCheckpointing(1000);

        DataStream<String> inputStream = see.addSource(new CustomSource());

        RollingPolicy<Object, String> rollingPolicy = new CustomRollingPolicy();

        StreamingFileSink s3Sink = StreamingFileSink.
                forRowFormat(new Path("s3://mybucket/data/"),
                new SimpleStringEncoder<>("UTF-8"))
                .build();


        //inputStream.print();

        inputStream.addSink(s3Sink);

        see.execute();
    }

    static class CustomSource extends RichSourceFunction<String> {

        private volatile boolean running = false;

        final String[] strings = {"ABC", "XYZ", "DEF"};

        @Override
        public void open(Configuration parameters){
            running = true;
        }

        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while (running) {
                Random random = new Random();
                int index = random.nextInt(strings.length);
                sourceContext.collect(strings[index]);
                Thread.sleep(1000);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }

}

仍然,s3 中没有数据,Flink 进程甚至没有验证给定的 S3 存储桶是否有效,但是进程 运行 没有任何问题。

更新:

以下是自定义滚动策略的详细信息:

public class CustomRollingPolicy implements RollingPolicy<Object, String> {

    @Override
    public boolean shouldRollOnCheckpoint(PartFileInfo partFileInfo) throws IOException {
        return partFileInfo.getSize() > 1;
    }

    @Override
    public boolean shouldRollOnEvent(PartFileInfo partFileInfo, Object o) throws IOException {
        return true;
    }

    @Override
    public boolean shouldRollOnProcessingTime(PartFileInfo partFileInfo, long l) throws IOException {
        return true;
    }
}

我认为问题在于您编写的作业不会 运行 足够长以实际检查点,因此输出不会最终确定。

另一个潜在的问题是 StreamingFileSink 只适用于基于 Hadoop 的 S3 文件系统(而不是来自 Presto 的文件系统)。

在使用 fs.s3a.access.key、fs.s3a.secret.key.

等必需的 s3a 属性设置 flink-conf.yaml 后,上述问题得到解决

我们还需要让 Flink 知道配置位置。

FileSystem.initialize(GlobalConfiguration.loadConfiguration(""));

通过这些更改,我能够 运行 从本地接收 S3,并将消息持久保存到 S3,没有任何问题。