Flink 文件 Sink 中的容错
Fault tolerance in Flink file Sink
我在集群模式下使用 Flink 流式传输与 Kafka 消费者连接器 (FlinkKafkaConsumer) 和文件接收器 (StreamingFileSink),策略只有一次。
文件接收器将文件写入本地磁盘。
我注意到,如果作业失败并自动重启,任务管理器会查找上次失败作业的剩余文件(隐藏文件)。
显然,由于可以将任务分配给不同的任务管理器,这导致了一次又一次的失败。
到目前为止,我找到的唯一解决方案是删除隐藏文件并重新提交作业。
如果我做对了(如果我错了请纠正我),隐藏文件中的事件不会提交给 bootstrap-服务器,因此不会丢失数据。
有没有办法,强制Flink忽略已经写入的文件?或者也许有更好的方法来实现解决方案(也许以某种方式使用保存点)?
我在Flink邮件列表中得到了非常详细的回答。 TLDR,为了只执行一次,我必须使用某种分布式文件系统。
完整答案:
本地文件系统不是您想要实现的目标的正确选择。我认为您无法在此设置中实现真正的恰好一次策略。让我详细说明原因。
有趣的是它在检查点上的表现。该行为由 RollingPolicy 控制。由于您没有说明您使用的是什么格式,我们假设您首先使用行格式。对于行格式,默认滚动策略(何时将文件从进行中更改为挂起)是在文件达到 128MB、文件早于 60 秒或 60 秒未写入时滚动。它不会在检查点滚动。此外,StreamingFileSink 将文件系统视为可在恢复后访问的持久接收器。这意味着它会在从 checkpoint/savepoint.
恢复时尝试附加到此文件
即使您在每个检查点滚动文件,您仍然可能会遇到一些问题,因为 StreamingFileSink 在检查点完成后将文件从挂起移动到完成。如果在完成检查点和移动文件之间发生故障,它将无法在恢复后移动它们(如果有访问权限它会这样做)。
最后,完成的检查点将包含已成功端到端处理的记录的偏移量,这意味着 StreamingFileSink 假定已提交的记录。这可以是写入一个正在进行的文件的记录,在 StreamingFileSink 检查点元数据中有一个指针,在 "pending" 文件中记录,在 StreamingFileSink 检查点元数据中有一个条目表明这个文件已经完成,或者在 "finished" 中记录] 文件。[1]
因此,如您所见,有多种情况下 StreamingFileSink 必须在重启后访问文件。
最后一件事,你提到了"committing to the "bootstrap-server”。请记住,Flink 不使用提交回 Kafka 的偏移量来保证一致性。它可以写回这些偏移量,但只能出于 monitoring/debugging 目的。Flink stores/restores 从其检查点处理的偏移量。[3]
如果有帮助,请告诉我。我尽力了 ;) 顺便说一句,我非常鼓励阅读链接的资源,因为它们试图以更有条理的方式描述所有这些。
我也在抄送 Kostas,他比我更了解 StreamingFileSink。所以他也许可以在某个地方纠正我。
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html
[3]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
我在集群模式下使用 Flink 流式传输与 Kafka 消费者连接器 (FlinkKafkaConsumer) 和文件接收器 (StreamingFileSink),策略只有一次。 文件接收器将文件写入本地磁盘。 我注意到,如果作业失败并自动重启,任务管理器会查找上次失败作业的剩余文件(隐藏文件)。 显然,由于可以将任务分配给不同的任务管理器,这导致了一次又一次的失败。 到目前为止,我找到的唯一解决方案是删除隐藏文件并重新提交作业。 如果我做对了(如果我错了请纠正我),隐藏文件中的事件不会提交给 bootstrap-服务器,因此不会丢失数据。
有没有办法,强制Flink忽略已经写入的文件?或者也许有更好的方法来实现解决方案(也许以某种方式使用保存点)?
我在Flink邮件列表中得到了非常详细的回答。 TLDR,为了只执行一次,我必须使用某种分布式文件系统。
完整答案:
本地文件系统不是您想要实现的目标的正确选择。我认为您无法在此设置中实现真正的恰好一次策略。让我详细说明原因。 有趣的是它在检查点上的表现。该行为由 RollingPolicy 控制。由于您没有说明您使用的是什么格式,我们假设您首先使用行格式。对于行格式,默认滚动策略(何时将文件从进行中更改为挂起)是在文件达到 128MB、文件早于 60 秒或 60 秒未写入时滚动。它不会在检查点滚动。此外,StreamingFileSink 将文件系统视为可在恢复后访问的持久接收器。这意味着它会在从 checkpoint/savepoint.
恢复时尝试附加到此文件即使您在每个检查点滚动文件,您仍然可能会遇到一些问题,因为 StreamingFileSink 在检查点完成后将文件从挂起移动到完成。如果在完成检查点和移动文件之间发生故障,它将无法在恢复后移动它们(如果有访问权限它会这样做)。
最后,完成的检查点将包含已成功端到端处理的记录的偏移量,这意味着 StreamingFileSink 假定已提交的记录。这可以是写入一个正在进行的文件的记录,在 StreamingFileSink 检查点元数据中有一个指针,在 "pending" 文件中记录,在 StreamingFileSink 检查点元数据中有一个条目表明这个文件已经完成,或者在 "finished" 中记录] 文件。[1]
因此,如您所见,有多种情况下 StreamingFileSink 必须在重启后访问文件。
最后一件事,你提到了"committing to the "bootstrap-server”。请记住,Flink 不使用提交回 Kafka 的偏移量来保证一致性。它可以写回这些偏移量,但只能出于 monitoring/debugging 目的。Flink stores/restores 从其检查点处理的偏移量。[3]
如果有帮助,请告诉我。我尽力了 ;) 顺便说一句,我非常鼓励阅读链接的资源,因为它们试图以更有条理的方式描述所有这些。 我也在抄送 Kostas,他比我更了解 StreamingFileSink。所以他也许可以在某个地方纠正我。
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html [3]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration