NFS(Netapp 服务器)-> Flink ->s3
NFS (Netapp server)-> Flink ->s3
我是 flink 的新手 (java),正在尝试将 netapp 文件服务器上的 xml 文件作为文件路径安装到安装了 flink 的服务器上。
如何实时进行批处理或流处理以获取进入文件夹的文件并使用 s3 接收它。
我在 flink-starter 中找不到任何从本地文件系统读取文件的示例,flink 至少是这个用例的正确选择吗?如果是这样,我在哪里可以找到资源来收听文件夹和管理检查点/保存点?
如果您的目标只是将文件复制到 s3,那么有更简单、更合适的工具。或许sync是合适的。
假设使用 Flink 有意义(例如,因为你想对数据执行一些有状态的转换),那么你的所有任务管理器(工作人员)都需要能够访问这些文件使用相同的 URI 处理。为此,您可以使用 file:// URI。
您可以执行类似这样的操作来监视目录并在新文件出现时提取它们:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// monitor directory, checking for new files
// every 100 milliseconds
TextInputFormat format = new TextInputFormat(
new org.apache.flink.core.fs.Path("file:///tmp/dir/"));
DataStream<String> inputStream = env.readFile(
format,
"file:///tmp/dir/",
FileProcessingMode.PROCESS_CONTINUOUSLY,
100,
FilePathFilter.createDefaultFilter());
注意来自 documentation 的警告:
If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed.
这意味着您应该自动将准备好接收的文件移动到正在观看的文件夹中。
您可以使用 Streaming File Sink 写入 S3。 Flink 的写操作,例如 writeUsingOutputFormat()
,不参与检查点,所以在这种情况下这不是一个好的选择。
这个问题的完整工作代码在下面link。您需要启用检查点才能将 .inprogress 文件移动到实际文件
// 每 1000 毫秒启动一个检查点
env.enableCheckpointing(1000);
我是 flink 的新手 (java),正在尝试将 netapp 文件服务器上的 xml 文件作为文件路径安装到安装了 flink 的服务器上。
如何实时进行批处理或流处理以获取进入文件夹的文件并使用 s3 接收它。
我在 flink-starter 中找不到任何从本地文件系统读取文件的示例,flink 至少是这个用例的正确选择吗?如果是这样,我在哪里可以找到资源来收听文件夹和管理检查点/保存点?
如果您的目标只是将文件复制到 s3,那么有更简单、更合适的工具。或许sync是合适的。
假设使用 Flink 有意义(例如,因为你想对数据执行一些有状态的转换),那么你的所有任务管理器(工作人员)都需要能够访问这些文件使用相同的 URI 处理。为此,您可以使用 file:// URI。
您可以执行类似这样的操作来监视目录并在新文件出现时提取它们:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// monitor directory, checking for new files
// every 100 milliseconds
TextInputFormat format = new TextInputFormat(
new org.apache.flink.core.fs.Path("file:///tmp/dir/"));
DataStream<String> inputStream = env.readFile(
format,
"file:///tmp/dir/",
FileProcessingMode.PROCESS_CONTINUOUSLY,
100,
FilePathFilter.createDefaultFilter());
注意来自 documentation 的警告:
If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed.
这意味着您应该自动将准备好接收的文件移动到正在观看的文件夹中。
您可以使用 Streaming File Sink 写入 S3。 Flink 的写操作,例如 writeUsingOutputFormat()
,不参与检查点,所以在这种情况下这不是一个好的选择。
这个问题的完整工作代码在下面link。您需要启用检查点才能将 .inprogress 文件移动到实际文件
// 每 1000 毫秒启动一个检查点 env.enableCheckpointing(1000);