使用 Nifi 预处理大文件
Preproces a large file using Nifi
我们有高达 8GB 的包含结构化内容的文件,但重要的元数据存储在文件的最后一行,需要附加到每一行内容。使用 ReverseFileReader 获取最后一行很容易,但这需要文件在磁盘上是静态的,而我无法在我们现有的 Nifi 流程中找到执行此操作的方法?这在数据流式传输到内容存储库之前可能吗?
在 Nifi 中处理 8 GB 的文件可能效率低下。您可以尝试其他选项:-
ListSFTP --> ExecuteSparkInteractive --> RouteOnAttributes ----> ....
在这里,您不需要实际通过 Nifi 传输数据,只需在 nifi 属性中传递文件位置(可以是 hdfs 或 non-hdfs 位置)并编写 pyspark 或 spark scala 代码来读取该文件(您可以通过 ExecuteSparkInteractive 运行 此代码)。代码将在 spark 集群上执行,只有作业结果将被发送回 Nifi,您可以进一步使用它来路由您的 nifi 流(使用 RouteOnAttribute 处理器)。
注意:您需要将 Livy 设置为 运行 来自 Nifi 的 spark 代码。
希望这对您有所帮助。
我们有高达 8GB 的包含结构化内容的文件,但重要的元数据存储在文件的最后一行,需要附加到每一行内容。使用 ReverseFileReader 获取最后一行很容易,但这需要文件在磁盘上是静态的,而我无法在我们现有的 Nifi 流程中找到执行此操作的方法?这在数据流式传输到内容存储库之前可能吗?
在 Nifi 中处理 8 GB 的文件可能效率低下。您可以尝试其他选项:-
ListSFTP --> ExecuteSparkInteractive --> RouteOnAttributes ----> ....
在这里,您不需要实际通过 Nifi 传输数据,只需在 nifi 属性中传递文件位置(可以是 hdfs 或 non-hdfs 位置)并编写 pyspark 或 spark scala 代码来读取该文件(您可以通过 ExecuteSparkInteractive 运行 此代码)。代码将在 spark 集群上执行,只有作业结果将被发送回 Nifi,您可以进一步使用它来路由您的 nifi 流(使用 RouteOnAttribute 处理器)。
注意:您需要将 Livy 设置为 运行 来自 Nifi 的 spark 代码。
希望这对您有所帮助。