在 NiFi 中安排批处理 SQL

Schedule batch SQL in NiFi

我正在使用 NiFi 连接 2 个系统:

我需要减少 Kafka 主题中的 JSON 并将它们推送到适当的表中。这样做没有什么大问题,但是......源系统生成了太多事件,目标数据库为每次修改触发了进程。并且尺寸不足以处理那么多进程。

所以我在我的数据库中进行批量更新,使用 Text Processor + Update Attribute Processor + ReplaceText Processor 后面的 PutSQL Processor (例如此处所示:https://community.hortonworks.com/articles/91849/design-nifi-flow-for-using-putsql-processor-to-per.html).

但是这个工作流程允许我根据要放入其中的元素数量(我的批量大小)更新我的数据库。

我想定期批量更新。原因是源事件不是线性发生的,目标数据库不能接受超过 5 分钟"away" 来自源头。所以我需要安排我的批量更新最差每 5 分钟一次。

我现在不知道该怎么做。你能告诉我你想要哪个吗processors/solution?

PS:当然,存在大量更好的解决方案,比如在我的目标数据库中每次提交时不触发繁重的进程,但是改变这个 "good old system" 是负担不起的吧现在。

干杯, 奥利维尔

我建议串联使用 WaitNotify 处理器来设置一个 "gate" 将流文件保存在队列中,直到 Notify 处理器(与运行 约 5 分钟的时间表)发送 "trigger" 流文件。 Koji Kawamura 写了an extensive article documenting this behavior pattern

嗯...答案确实很简单。您只需要进入处理器的 "Schedule" 选项卡。我现在 运行 1.6.0-SNAPSHOT(顺便说一下,这个选项似乎已经存在很长时间了......我只是没有注意到它)并且它为调度提供了设置的能力一个 Cron 调度程序。完美满足需求...