Flink StreamingFileSink 没有将数据写入 AWS S3
Flink StreamingFileSink not writing data to AWS S3
我有一个代表数据流的集合并测试 StreamingFileSink 以将流写入 S3。编程 运行 成功,但给定的 S3 路径中没有数据。
public class S3Sink {
public static void main(String args[]) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.enableCheckpointing(100);
List<String> input = new ArrayList<>();
input.add("test");
DataStream<String> inputStream = see.fromCollection(input);
RollingPolicy<Object, String> rollingPolicy = new CustomRollingPolicy();
StreamingFileSink s3Sink = StreamingFileSink.
forRowFormat(new Path("<S3 Path>"),
new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(rollingPolicy)
.build();
inputStream.addSink(s3Sink);
see.execute();
}
}
检查点也已启用。关于为什么 Sink 没有按预期工作有什么想法吗?
更新:
根据 David 的回答,创建了连续生成随机字符串的自定义源,我希望检查点在配置的时间间隔后触发以将数据写入 S3。
public class S3SinkCustom {
public static void main(String args[]) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.enableCheckpointing(1000);
DataStream<String> inputStream = see.addSource(new CustomSource());
RollingPolicy<Object, String> rollingPolicy = new CustomRollingPolicy();
StreamingFileSink s3Sink = StreamingFileSink.
forRowFormat(new Path("s3://mybucket/data/"),
new SimpleStringEncoder<>("UTF-8"))
.build();
//inputStream.print();
inputStream.addSink(s3Sink);
see.execute();
}
static class CustomSource extends RichSourceFunction<String> {
private volatile boolean running = false;
final String[] strings = {"ABC", "XYZ", "DEF"};
@Override
public void open(Configuration parameters){
running = true;
}
@Override
public void run(SourceContext sourceContext) throws Exception {
while (running) {
Random random = new Random();
int index = random.nextInt(strings.length);
sourceContext.collect(strings[index]);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
}
仍然,s3 中没有数据,Flink 进程甚至没有验证给定的 S3 存储桶是否有效,但是进程 运行 没有任何问题。
更新:
以下是自定义滚动策略的详细信息:
public class CustomRollingPolicy implements RollingPolicy<Object, String> {
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo partFileInfo) throws IOException {
return partFileInfo.getSize() > 1;
}
@Override
public boolean shouldRollOnEvent(PartFileInfo partFileInfo, Object o) throws IOException {
return true;
}
@Override
public boolean shouldRollOnProcessingTime(PartFileInfo partFileInfo, long l) throws IOException {
return true;
}
}
我认为问题在于您编写的作业不会 运行 足够长以实际检查点,因此输出不会最终确定。
另一个潜在的问题是 StreamingFileSink 只适用于基于 Hadoop 的 S3 文件系统(而不是来自 Presto 的文件系统)。
在使用 fs.s3a.access.key、fs.s3a.secret.key.
等必需的 s3a 属性设置 flink-conf.yaml 后,上述问题得到解决
我们还需要让 Flink 知道配置位置。
FileSystem.initialize(GlobalConfiguration.loadConfiguration(""));
通过这些更改,我能够 运行 从本地接收 S3,并将消息持久保存到 S3,没有任何问题。
我有一个代表数据流的集合并测试 StreamingFileSink 以将流写入 S3。编程 运行 成功,但给定的 S3 路径中没有数据。
public class S3Sink {
public static void main(String args[]) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.enableCheckpointing(100);
List<String> input = new ArrayList<>();
input.add("test");
DataStream<String> inputStream = see.fromCollection(input);
RollingPolicy<Object, String> rollingPolicy = new CustomRollingPolicy();
StreamingFileSink s3Sink = StreamingFileSink.
forRowFormat(new Path("<S3 Path>"),
new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(rollingPolicy)
.build();
inputStream.addSink(s3Sink);
see.execute();
}
}
检查点也已启用。关于为什么 Sink 没有按预期工作有什么想法吗?
更新: 根据 David 的回答,创建了连续生成随机字符串的自定义源,我希望检查点在配置的时间间隔后触发以将数据写入 S3。
public class S3SinkCustom {
public static void main(String args[]) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.enableCheckpointing(1000);
DataStream<String> inputStream = see.addSource(new CustomSource());
RollingPolicy<Object, String> rollingPolicy = new CustomRollingPolicy();
StreamingFileSink s3Sink = StreamingFileSink.
forRowFormat(new Path("s3://mybucket/data/"),
new SimpleStringEncoder<>("UTF-8"))
.build();
//inputStream.print();
inputStream.addSink(s3Sink);
see.execute();
}
static class CustomSource extends RichSourceFunction<String> {
private volatile boolean running = false;
final String[] strings = {"ABC", "XYZ", "DEF"};
@Override
public void open(Configuration parameters){
running = true;
}
@Override
public void run(SourceContext sourceContext) throws Exception {
while (running) {
Random random = new Random();
int index = random.nextInt(strings.length);
sourceContext.collect(strings[index]);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
}
仍然,s3 中没有数据,Flink 进程甚至没有验证给定的 S3 存储桶是否有效,但是进程 运行 没有任何问题。
更新:
以下是自定义滚动策略的详细信息:
public class CustomRollingPolicy implements RollingPolicy<Object, String> {
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo partFileInfo) throws IOException {
return partFileInfo.getSize() > 1;
}
@Override
public boolean shouldRollOnEvent(PartFileInfo partFileInfo, Object o) throws IOException {
return true;
}
@Override
public boolean shouldRollOnProcessingTime(PartFileInfo partFileInfo, long l) throws IOException {
return true;
}
}
我认为问题在于您编写的作业不会 运行 足够长以实际检查点,因此输出不会最终确定。
另一个潜在的问题是 StreamingFileSink 只适用于基于 Hadoop 的 S3 文件系统(而不是来自 Presto 的文件系统)。
在使用 fs.s3a.access.key、fs.s3a.secret.key.
等必需的 s3a 属性设置 flink-conf.yaml 后,上述问题得到解决我们还需要让 Flink 知道配置位置。
FileSystem.initialize(GlobalConfiguration.loadConfiguration(""));
通过这些更改,我能够 运行 从本地接收 S3,并将消息持久保存到 S3,没有任何问题。