在 Apach Flink 中为每个处理过的输入文件生成一个输出文件
Generating a single output file for each processed input file in Apach Flink
我正在使用 Scala 和 Apache Flink 构建一个 ETL,它定期读取本地文件系统中某个目录下的所有文件,并将处理每个文件的结果写入另一个目录下的单个输出文件中。
所以这方面的一个例子是:
/dir/to/input/files/file1
/dir/to/intput/files/fil2
/dir/to/input/files/file3
ETL 的输出恰好是:
/dir/to/output/files/file1
/dir/to/output/files/file2
/dir/to/output/files/file3
我尝试了各种方法,包括在写入dataSink时将并行处理减少为一个,但我仍然无法达到要求的结果。
这是我当前的代码:
val path = "/path/to/input/files/"
val format = new TextInputFormat(new Path(path))
val socketStream = env.readFile(format, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 10)
val wordsStream = socketStream.flatMap(value => value.split(",")).map(value => WordWithCount(value,1))
val keyValuePair = wordsStream.keyBy(_.word)
val countPair = keyValuePair.sum("count")
countPair.print()
countPair.writeAsText("/path/to/output/directory/"+
DateTime.now().getHourOfDay.toString
+
DateTime.now().getMinuteOfHour.toString
+
DateTime.now().getSecondOfMinute.toString
, FileSystem.WriteMode.NO_OVERWRITE)
// The first write method I trid:
val sink = new BucketingSink[WordWithCount]("/path/to/output/directory/")
sink.setBucketer(new DateTimeBucketer[WordWithCount]("yyyy-MM-dd--HHmm"))
// The second write method I trid:
val sink3 = new BucketingSink[WordWithCount]("/path/to/output/directory/")
sink3.setUseTruncate(false)
sink3.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
sink3.setWriter(new StringWriter[WordWithCount])
sink3.setBatchSize(3)
sink3.setPendingPrefix("file-")
sink3.setPendingSuffix(".txt")
两种写入方法都无法产生想要的结果。
请有Apache Flink 经验的人指导我编写方法。
我解决了这个问题,将下一个依赖项导入本地机器上的 运行:
- hadoop-aws-2.7.3.jar
- aws-java-sdk-s3-1.11.183.jar
- aws-java-sdk-core-1.11.183.jar
- aws-java-sdk-kms-1.11.183.jar
- jackson-annotations-2.6.7.jar
- jackson-core-2.6.7.jar
- jackson-databind-2.6.7.jar
- joda-time-2.8.1.jar
- httpcore-4.4.4.jar
- httpclient-4.5.3.jar
您可以在 :
上查看
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html
节"Provide S3 FileSystem Dependency"
我正在使用 Scala 和 Apache Flink 构建一个 ETL,它定期读取本地文件系统中某个目录下的所有文件,并将处理每个文件的结果写入另一个目录下的单个输出文件中。
所以这方面的一个例子是:
/dir/to/input/files/file1
/dir/to/intput/files/fil2
/dir/to/input/files/file3
ETL 的输出恰好是:
/dir/to/output/files/file1
/dir/to/output/files/file2
/dir/to/output/files/file3
我尝试了各种方法,包括在写入dataSink时将并行处理减少为一个,但我仍然无法达到要求的结果。
这是我当前的代码:
val path = "/path/to/input/files/"
val format = new TextInputFormat(new Path(path))
val socketStream = env.readFile(format, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 10)
val wordsStream = socketStream.flatMap(value => value.split(",")).map(value => WordWithCount(value,1))
val keyValuePair = wordsStream.keyBy(_.word)
val countPair = keyValuePair.sum("count")
countPair.print()
countPair.writeAsText("/path/to/output/directory/"+
DateTime.now().getHourOfDay.toString
+
DateTime.now().getMinuteOfHour.toString
+
DateTime.now().getSecondOfMinute.toString
, FileSystem.WriteMode.NO_OVERWRITE)
// The first write method I trid:
val sink = new BucketingSink[WordWithCount]("/path/to/output/directory/")
sink.setBucketer(new DateTimeBucketer[WordWithCount]("yyyy-MM-dd--HHmm"))
// The second write method I trid:
val sink3 = new BucketingSink[WordWithCount]("/path/to/output/directory/")
sink3.setUseTruncate(false)
sink3.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
sink3.setWriter(new StringWriter[WordWithCount])
sink3.setBatchSize(3)
sink3.setPendingPrefix("file-")
sink3.setPendingSuffix(".txt")
两种写入方法都无法产生想要的结果。
请有Apache Flink 经验的人指导我编写方法。
我解决了这个问题,将下一个依赖项导入本地机器上的 运行:
- hadoop-aws-2.7.3.jar
- aws-java-sdk-s3-1.11.183.jar
- aws-java-sdk-core-1.11.183.jar
- aws-java-sdk-kms-1.11.183.jar
- jackson-annotations-2.6.7.jar
- jackson-core-2.6.7.jar
- jackson-databind-2.6.7.jar
- joda-time-2.8.1.jar
- httpcore-4.4.4.jar
- httpclient-4.5.3.jar
您可以在 :
上查看https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html
节"Provide S3 FileSystem Dependency"