使用 NiFi Flowfiles 作为事件通知器
Using NiFi Flowfiles as an Event Notifier
NiFi 新手!
我想知道是否有办法在 NiFi 中发送带有流文件属性的空流文件?我想用它作为触发器来指示一种类型的事件已经开始。
在 NiFi 中有没有其他方法可以让我指示一组事件已经开始和结束?例如,如果我有三个处理器读取数据,我想知道第一个处理器即将被触发,最后一个处理器已经完成。无论如何我可以这样做吗?如果处理器继续 运行,我希望能够一次性将从处理器 1 读取的数据分组到处理器 3。为了更清楚地说明这一点
Begin
Processor1
Processor2
Processor3
End
Begin
Processor1
Processor2
Processor3
End
...
如有任何帮助,我们将不胜感激,
提前致谢!
我要把这个答案分成几个部分,因为这里有很多事情要做。
I was wondering if there is a way to send an empty flowfile with
attributes on the flowfile in NiFi? I'd like to use this as a trigger
to indicate that a type of event has Started.
GenerateFlowFile
processor allows you to send an empty (or populated) flowfile at a regular run schedule or using CRON scheduling. You can combine this with the UpdateAttribute
处理器向流文件添加任意静态或动态属性。
In NiFi is there any other way for me to indicate that a set of events
have started and finished? For instance, if i have three processors
that read in data and i would like to know that the first processor is
about to be triggered and that the last processor has finished. Is
there anyway for me to do this?
这接近于批处理,而 Apache NiFi 并未针对批处理进行设计或优化。要确定 source 处理器是 "about to be triggered" 是非常困难的。如果该处理器是在 timer/CRON 基础上触发的,您可以知道该时间,但如果您的意思是“GetFile
即将成功检索文件”之类的话,那可不容易做到。可以使用您自己的自定义处理器扩展处理器并覆盖 onTrigger()
方法以在 DistributedMapCacheClientService
that another processor can pick up on. Or I guess you could wrap the logic in an ExecuteScript
处理器中存储一些值并编写自定义通知代码。我不确定这里的 target -- 谁 会收到此状态更改的通知?它是另一个处理器、人类观察者还是外部服务?
If the processors continue to run, i would like to be able to group
the data read from processor 1 to processor 3 in one pass. To make
this more clear
Begin Processor1 Processor2 Processor3 End Begin Processor1 Processor2
Processor3 End ...
但是,我相信使用新的 Wait
and Notify
processors. Koji Kawamura has written a good article describing their use here 可以实现您的要求。
我认为在这种情况下,您需要特殊的内容或属性才能检测通过系统的批次,除非它一次是一个数据单元。我将尝试在下面描述两种情况,但我对此没有太多背景。
场景一(单个数据单元)
请随意替换不同的源处理器,但为了简单起见,我使用 GetFile
。
假设您有一个充满文本文件的目录(由某些外部进程放置在那里)。每个文件都包含 "Firstname Lastname" 形式的文本,并被命名为 Lastname_YYYY-MM-DD-HH-mm-ss.txt
,并在文件名中写入时间戳。
GetFile -> ReplaceText -> PutFile
GetFile
处理器会将每个文件作为单独的流文件引入。从那里,ReplaceText
可以做一些简单的事情,比如使用正则表达式来切换名称的顺序,然后 PutFile
将内容写回文件系统。当 GetFile
第一次触发时,它会分派 n 个流文件到 connection/queue 到 ReplaceText
。如果你想让它等待并线性地而不是并行地执行操作,你可以将 success 队列的背压设置为 1
flowfile 以防止前面的处理器(GetFile
) 从 运行 直到队列再次为空。
场景二(多个flowfile必须组合在一起,连动操作)
在这里,您需要使用 MergeContent
将多个流文件收集到一个流文件中。您可以将 bin 阈值设置为 n 流文件,并且 MergeContent
处理器将仅在达到最小数量时传输 success 流文件传入的流文件。您还可以按属性分箱,因此如果您从异构输入源读取数据,您仍然可以根据共同特征关联相关的数据片段。
替代方案 Wait
& Notify
此外,您可以使用 Notify
处理器将触发流文件发送到相应的 Wait
处理器,以 "release" "content" 流文件到达它们所需的目的地。同样,上面链接的 Koji 的文章通过示例流程和大量屏幕截图详细解释了这一点。
我希望这至少能给你一个遵循的方向。在没有更多上下文的情况下,我仍然觉得您正在尝试在这里解决非 NiFi 问题,或者可以调整您的数据流模型以更好地支持流媒体心态。如果您有更多信息,我很乐意扩展答案。
NiFi 新手!
我想知道是否有办法在 NiFi 中发送带有流文件属性的空流文件?我想用它作为触发器来指示一种类型的事件已经开始。
在 NiFi 中有没有其他方法可以让我指示一组事件已经开始和结束?例如,如果我有三个处理器读取数据,我想知道第一个处理器即将被触发,最后一个处理器已经完成。无论如何我可以这样做吗?如果处理器继续 运行,我希望能够一次性将从处理器 1 读取的数据分组到处理器 3。为了更清楚地说明这一点
Begin
Processor1
Processor2
Processor3
End
Begin
Processor1
Processor2
Processor3
End
...
如有任何帮助,我们将不胜感激, 提前致谢!
我要把这个答案分成几个部分,因为这里有很多事情要做。
I was wondering if there is a way to send an empty flowfile with attributes on the flowfile in NiFi? I'd like to use this as a trigger to indicate that a type of event has Started.
GenerateFlowFile
processor allows you to send an empty (or populated) flowfile at a regular run schedule or using CRON scheduling. You can combine this with the UpdateAttribute
处理器向流文件添加任意静态或动态属性。
In NiFi is there any other way for me to indicate that a set of events have started and finished? For instance, if i have three processors that read in data and i would like to know that the first processor is about to be triggered and that the last processor has finished. Is there anyway for me to do this?
这接近于批处理,而 Apache NiFi 并未针对批处理进行设计或优化。要确定 source 处理器是 "about to be triggered" 是非常困难的。如果该处理器是在 timer/CRON 基础上触发的,您可以知道该时间,但如果您的意思是“GetFile
即将成功检索文件”之类的话,那可不容易做到。可以使用您自己的自定义处理器扩展处理器并覆盖 onTrigger()
方法以在 DistributedMapCacheClientService
that another processor can pick up on. Or I guess you could wrap the logic in an ExecuteScript
处理器中存储一些值并编写自定义通知代码。我不确定这里的 target -- 谁 会收到此状态更改的通知?它是另一个处理器、人类观察者还是外部服务?
If the processors continue to run, i would like to be able to group the data read from processor 1 to processor 3 in one pass. To make this more clear
Begin Processor1 Processor2 Processor3 End Begin Processor1 Processor2 Processor3 End ...
但是,我相信使用新的 Wait
and Notify
processors. Koji Kawamura has written a good article describing their use here 可以实现您的要求。
我认为在这种情况下,您需要特殊的内容或属性才能检测通过系统的批次,除非它一次是一个数据单元。我将尝试在下面描述两种情况,但我对此没有太多背景。
场景一(单个数据单元)
请随意替换不同的源处理器,但为了简单起见,我使用 GetFile
。
假设您有一个充满文本文件的目录(由某些外部进程放置在那里)。每个文件都包含 "Firstname Lastname" 形式的文本,并被命名为 Lastname_YYYY-MM-DD-HH-mm-ss.txt
,并在文件名中写入时间戳。
GetFile -> ReplaceText -> PutFile
GetFile
处理器会将每个文件作为单独的流文件引入。从那里,ReplaceText
可以做一些简单的事情,比如使用正则表达式来切换名称的顺序,然后 PutFile
将内容写回文件系统。当 GetFile
第一次触发时,它会分派 n 个流文件到 connection/queue 到 ReplaceText
。如果你想让它等待并线性地而不是并行地执行操作,你可以将 success 队列的背压设置为 1
flowfile 以防止前面的处理器(GetFile
) 从 运行 直到队列再次为空。
场景二(多个flowfile必须组合在一起,连动操作)
在这里,您需要使用 MergeContent
将多个流文件收集到一个流文件中。您可以将 bin 阈值设置为 n 流文件,并且 MergeContent
处理器将仅在达到最小数量时传输 success 流文件传入的流文件。您还可以按属性分箱,因此如果您从异构输入源读取数据,您仍然可以根据共同特征关联相关的数据片段。
替代方案 Wait
& Notify
此外,您可以使用 Notify
处理器将触发流文件发送到相应的 Wait
处理器,以 "release" "content" 流文件到达它们所需的目的地。同样,上面链接的 Koji 的文章通过示例流程和大量屏幕截图详细解释了这一点。
我希望这至少能给你一个遵循的方向。在没有更多上下文的情况下,我仍然觉得您正在尝试在这里解决非 NiFi 问题,或者可以调整您的数据流模型以更好地支持流媒体心态。如果您有更多信息,我很乐意扩展答案。