关于 flink stream sink to hdfs

Regarding flink stream sink to hdfs

我正在编写一个 flink 代码,其中我从本地系统读取文件并使用 "writeUsingOutputFormat" 将其写入数据库。

现在我的要求是写入hdfs而不是数据库。

你能帮我看看我在flink中该怎么做吗?

注意:hdfs 已启动并且 运行 在我的本地计算机上。

Flink 提供HDFS connector which can be used to write data to any file system supported by Hadoop Filesystem.

提供的接收器是一个 Bucketing 接收器,它将数据流分区到包含滚动文件的文件夹中。可以使用 batch sizebatch roll over time interval

等参数配置分桶行为以及写入

Flink文档给出了如下例子-

DataStream<Tuple2<IntWritable,Text>> input = ...;

BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins

input.addSink(sink);

目前较新的 Streaming File Sink 可能是比 Bucketing Sink 更好的选择。这个描述来自Flink 1.6 release notes(注意Flink 1.7增加了对S3的支持):

The new StreamingFileSink is an exactly-once sink for writing to filesystems which capitalizes on the knowledge acquired from the previous BucketingSink. Exactly-once is supported through integration of the sink with Flink’s checkpointing mechanism. The new sink is built upon Flink’s own FileSystem abstraction and it supports local file system and HDFS, with plans for S3 support in the near future [now included in Flink 1.7]. It exposes pluggable file rolling and bucketing policies. Apart from row-wise encoding formats, the new StreamingFileSink comes with support for Parquet. Other bulk-encoding formats like ORC can be easily added using the exposed APIs.