使用 ConsumeKafka 处理器时创建更大的 NiFi 流文件
Creating larger NiFi flow files when using the ConsumeKafka processor
我创建了一个简单的 NiFi 管道,它从 Kafka 主题读取数据流(使用 ConsumeKafka
)并将其写入 HDFS(使用 PutHDFS
)。目前,我看到在 HDFS 上创建了许多小文件。大约每秒创建一个新文件,有些文件只有一条或两条记录。
我希望将更少、更大的文件写入 HDFS。
我在ConsumeKafka
中有以下设置:
Message Demarcator = <new line>
Max Poll Records = 10000
Max Uncommitted Time = 20s
过去我使用 Flume 而不是 Nifi,它有 batchSize
和 batchDurationMillis
,这让我可以调整 HDFS 文件的大小。 Nifi 中的 ConsumeKafka
似乎缺少 batchDurationMillis
等价物。
NiFi 中的解决方案是什么?
使用消息分界符和最大轮询记录是为每个流文件获取多条消息的正确方法。您可能希望通过将 运行 计划(在计划选项卡上)从 0 秒(这意味着 运行 尽可能快地调整为 1 秒或任何对您有意义的时间来减慢 ConsumeKafka 处理器的速度获取更多数据。
即使有上述内容,您可能仍希望在 PutHDFS 之前安装一个 MergeContent 处理器,并根据大小将流文件合并在一起,这样您就可以等到拥有适当数量的数据后再写入 HDFS。
如何使用 MergeContent 将取决于您要合并的数据类型...如果您有 Avro,则有针对 Avro 的特定合并策略。如果您有 JSON,您可以将它们一个接一个地合并,或者您可以用页眉、页脚和分界符将它们包裹起来,形成一个有效的 JSON 数组。
我创建了一个简单的 NiFi 管道,它从 Kafka 主题读取数据流(使用 ConsumeKafka
)并将其写入 HDFS(使用 PutHDFS
)。目前,我看到在 HDFS 上创建了许多小文件。大约每秒创建一个新文件,有些文件只有一条或两条记录。
我希望将更少、更大的文件写入 HDFS。
我在ConsumeKafka
中有以下设置:
Message Demarcator = <new line>
Max Poll Records = 10000
Max Uncommitted Time = 20s
过去我使用 Flume 而不是 Nifi,它有 batchSize
和 batchDurationMillis
,这让我可以调整 HDFS 文件的大小。 Nifi 中的 ConsumeKafka
似乎缺少 batchDurationMillis
等价物。
NiFi 中的解决方案是什么?
使用消息分界符和最大轮询记录是为每个流文件获取多条消息的正确方法。您可能希望通过将 运行 计划(在计划选项卡上)从 0 秒(这意味着 运行 尽可能快地调整为 1 秒或任何对您有意义的时间来减慢 ConsumeKafka 处理器的速度获取更多数据。
即使有上述内容,您可能仍希望在 PutHDFS 之前安装一个 MergeContent 处理器,并根据大小将流文件合并在一起,这样您就可以等到拥有适当数量的数据后再写入 HDFS。
如何使用 MergeContent 将取决于您要合并的数据类型...如果您有 Avro,则有针对 Avro 的特定合并策略。如果您有 JSON,您可以将它们一个接一个地合并,或者您可以用页眉、页脚和分界符将它们包裹起来,形成一个有效的 JSON 数组。