Flink:如何在使用 writeFileAsText(path) 写入文件时传递动态路径?

Flink: How to pass a dynamic path while writing to files using writeFileAsText(path)?

假设我有一个包含 String 类型元素的流。我想将流中的每个元素写入某个文件夹中的单独文件。我正在使用以下设置。

stream.writeAsText(path).setParallelism(1);

如何使这条路径动态化?我什至尝试将 System.nanotime() 添加到路径中以使其动态化。但它似乎仍然不起作用,所有内容都被写入一个文件。

你的问题是 DataStream.writeAsText() 一次将流的全部内容写入文件,所以你只会得到一个文件。

看起来这将 return 一个集合,您可以使用它来将字符串输出为不同的文件。

dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
    throws Exception {
    for(String word: value.split(" ")){
        out.collect(word);
    }
}
});

直接取自此处的文档:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html

Flink Rolling File Sink with a custom bucketer, or the newer and prefered Streaming File Sink 使用自定义 BucketAssigner 和 RollingPolicy 明确支持这种用例。