为什么flink程序重启后有很多in-progress文件?
why are there many in-progress files after restarting flink program?
我使用flink消费kafka,并以parquet格式保存到hdfs。现在我发现我的目标目录中有很多正在进行的文件,当我重新启动我的 flink 程序时,这些文件不会作为目标目录中的文件关闭。
我的环境:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(60000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getConfig.registerTypeWithKryoSerializer(classOf[MyMessage],classOf[ProtobufSerializer])
//sinks
val bucketAssigner = new DateTimeBucketAssigner[myCounter]("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"))
val streamingFileSink = StreamingFileSink.
forBulkFormat(path, ParquetAvroWriters.forSpecificRecord(classOf[myCounter]))
.withBucketCheckInterval(60000)
.withBucketAssigner(bucketAssigner).build
-rw-r--r-- 3 Administrator hdfs 1629 2019-08-05 17:06 /user/data/2019-08-05/.part-2-0.inprogress.722265d7-1082-4c84-b70d-da2a08092f5d
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 17:07 /user/data/2019-08-05/.part-2-1.inprogress.ac0d8b56-b8f0-4893-9e55-5374b69f16cc
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 17:08 /user/data/2019-08-05/.part-2-2.inprogress.a427c2e2-d689-42b8-aa3d-77873c5654f2
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 17:09 /user/data/2019-08-05/.part-2-3.inprogress.b5c746e3-354d-4ab3-b1a4-8c6bd88ae430
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 16:59 /user/data/2019-08-05/.part-2-3.inprogress.e286d995-3fa7-4696-b51a-27378412a35c
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 17:00 /user/data/2019-08-05/.part-2-4.inprogress.bcde4f30-2f78-4f54-92ad-9bc54ac57c5c
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 17:10 /user/data/2019-08-05/.part-2-4.inprogress.dbce8a00-6514-43dc-8b31-36c5a8665d37
-rw-r--r-- 3 Administrator hdfs 0 2019-08-05 17:10 /user/data/2019-08-05/.part-2-5.inprogress.34e53418-f5af-4279-87ef-6a27549d90fe
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 17:01 /user/data/2019-08-05/.part-2-5.inprogress.936cdb63-4fe2-41bf-b839-2861030c5516
-rw-r--r-- 3 Administrator hdfs 0 2019-08-05 16:55 /user/data/2019-08-05/.part-2-6.inprogress.7a7099a6-9dcd-450b-af2c-8a676276ef0a
-rw-r--r-- 3 Administrator hdfs 0 2019-08-05 17:01 /user/data/2019-08-05/.part-2-6.inprogress.b57f548f-45fc-497c-9807-ef18dba3d11d
-rw-r--r-- 3 Administrator hdfs 1574 2019-08-05 16:56 /user/data/2019-08-05/part-2-0
-rw-r--r-- 3 Administrator hdfs 1868 2019-08-05 16:57 /user/data/2019-08-05/part-2-1
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 16:58 /user/data/2019-08-05/part-2-2
-rw-r--r-- 3 Administrator hdfs 1661 2019-08-05 16:53 /user/data/2019-08-05/part-2-3
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 16:54 /user/data/2019-08-05/part-2-4
我想是因为我重启程序的时候进程中的文件没有关闭,我很困惑为什么重启后文件不会关闭,连新的文件都变成进程中了。有人可以解释一下吗?
简而言之,对于 Exactly-Once 语义。
请先阅读this post Flink 官方博客
那我就试着解释清楚吧。
BucketingSink 将所有记录写入临时文件,默认后缀为 in-progress。
当这个 sink 上的检查点时间到来时,Flink 会将正在进行的文件的名称保存到检查点;
当提交时,Flink 会将正在进行的文件重命名为最终名称,在您的示例中,它们是 part-x-x 文件。
并且当你重新启动Flink应用程序时,Flink作业将从最后一个保存点重新启动(如果你设置了参数),并且许多尚未准备好提交的正在进行的文件将被放弃,并且永远不会被读取(开始带点的将不会被 HDFS 列出)由用户。
当然,我忽略了很多细节,例如,当文件的体积超过配置时,文件将被重命名为.pending files,等等
您需要使用 flink shell 提交应用程序才能使应用程序从保存点恢复,如下所示:
./bin/flink run -s <savepointPath> ...
,查看 this 了解更多详情。
StreamingFileSink
将处理进行中的文件。
我使用flink消费kafka,并以parquet格式保存到hdfs。现在我发现我的目标目录中有很多正在进行的文件,当我重新启动我的 flink 程序时,这些文件不会作为目标目录中的文件关闭。
我的环境:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(60000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getConfig.registerTypeWithKryoSerializer(classOf[MyMessage],classOf[ProtobufSerializer])
//sinks
val bucketAssigner = new DateTimeBucketAssigner[myCounter]("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"))
val streamingFileSink = StreamingFileSink.
forBulkFormat(path, ParquetAvroWriters.forSpecificRecord(classOf[myCounter]))
.withBucketCheckInterval(60000)
.withBucketAssigner(bucketAssigner).build
-rw-r--r-- 3 Administrator hdfs 1629 2019-08-05 17:06 /user/data/2019-08-05/.part-2-0.inprogress.722265d7-1082-4c84-b70d-da2a08092f5d
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 17:07 /user/data/2019-08-05/.part-2-1.inprogress.ac0d8b56-b8f0-4893-9e55-5374b69f16cc
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 17:08 /user/data/2019-08-05/.part-2-2.inprogress.a427c2e2-d689-42b8-aa3d-77873c5654f2
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 17:09 /user/data/2019-08-05/.part-2-3.inprogress.b5c746e3-354d-4ab3-b1a4-8c6bd88ae430
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 16:59 /user/data/2019-08-05/.part-2-3.inprogress.e286d995-3fa7-4696-b51a-27378412a35c
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 17:00 /user/data/2019-08-05/.part-2-4.inprogress.bcde4f30-2f78-4f54-92ad-9bc54ac57c5c
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 17:10 /user/data/2019-08-05/.part-2-4.inprogress.dbce8a00-6514-43dc-8b31-36c5a8665d37
-rw-r--r-- 3 Administrator hdfs 0 2019-08-05 17:10 /user/data/2019-08-05/.part-2-5.inprogress.34e53418-f5af-4279-87ef-6a27549d90fe
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 17:01 /user/data/2019-08-05/.part-2-5.inprogress.936cdb63-4fe2-41bf-b839-2861030c5516
-rw-r--r-- 3 Administrator hdfs 0 2019-08-05 16:55 /user/data/2019-08-05/.part-2-6.inprogress.7a7099a6-9dcd-450b-af2c-8a676276ef0a
-rw-r--r-- 3 Administrator hdfs 0 2019-08-05 17:01 /user/data/2019-08-05/.part-2-6.inprogress.b57f548f-45fc-497c-9807-ef18dba3d11d
-rw-r--r-- 3 Administrator hdfs 1574 2019-08-05 16:56 /user/data/2019-08-05/part-2-0
-rw-r--r-- 3 Administrator hdfs 1868 2019-08-05 16:57 /user/data/2019-08-05/part-2-1
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 16:58 /user/data/2019-08-05/part-2-2
-rw-r--r-- 3 Administrator hdfs 1661 2019-08-05 16:53 /user/data/2019-08-05/part-2-3
-rw-r--r-- 3 Administrator hdfs 1891 2019-08-05 16:54 /user/data/2019-08-05/part-2-4
我想是因为我重启程序的时候进程中的文件没有关闭,我很困惑为什么重启后文件不会关闭,连新的文件都变成进程中了。有人可以解释一下吗?
简而言之,对于 Exactly-Once 语义。
请先阅读this post Flink 官方博客
那我就试着解释清楚吧。
BucketingSink 将所有记录写入临时文件,默认后缀为 in-progress。
当这个 sink 上的检查点时间到来时,Flink 会将正在进行的文件的名称保存到检查点;
当提交时,Flink 会将正在进行的文件重命名为最终名称,在您的示例中,它们是 part-x-x 文件。
并且当你重新启动Flink应用程序时,Flink作业将从最后一个保存点重新启动(如果你设置了参数),并且许多尚未准备好提交的正在进行的文件将被放弃,并且永远不会被读取(开始带点的将不会被 HDFS 列出)由用户。
当然,我忽略了很多细节,例如,当文件的体积超过配置时,文件将被重命名为.pending files,等等
您需要使用 flink shell 提交应用程序才能使应用程序从保存点恢复,如下所示:
./bin/flink run -s <savepointPath> ...
,查看 this 了解更多详情。
StreamingFileSink
将处理进行中的文件。