如何在 CRON 驱动的 DetectDuplicate 中摄取所有流文件?
How to ingest all the flowfiles in a CRON driven DetectDuplicate?
在 NiFi 中,我有一个 cron 驱动的处理器序列,每天提供一组流文件,其中包含我感兴趣的 2 个属性:product_code
和 publication_date
。
我的需要是每个 product_code
只保留一个流文件:最新的 publication_date
.
例如:
对于此输入:
flow_1: product_code: A / publication_date : 2018-01-01
flow_2: product_code: B / publication_date : 2018-01-01
flow_3: product_code: C / publication_date : 2018-01-01
flow_4: product_code: A / publication_date : 2018-04-12
flow_5: product_code: A / publication_date : 2000-12-31
flow_6: product_code: B / publication_date : 2018-02-02
flow_7: product_code: B / publication_date : 2018-03-03
预期的输出应该是:
flow_3: product_code: C / publication_date : 2018-01-01
flow_4: product_code: A / publication_date : 2018-04-12
flow_7: product_code: B / publication_date : 2018-03-03
我测试的算法
- 使用
UpdateAttribute
处理器根据 publication_date
. 向每个流文件添加属性 priority
- 这些更新的流文件被重定向到
PriorityAttributePrioritizer
队列。
- 流文件留在这个队列中,因为只有一个消耗处理器,它是 cron 驱动的。通过这种方式,我确定队列中的流文件是根据
publication_date
. 排序的
- 然后 CRON 触发下一个处理器,一个基于
product_code
属性的 DetectDuplicate
。由于流文件是从最近的项目到最旧的项目处理的,我确信当 product_code
被检测为重复时,这是因为相同的 product_code
已经可以使用更新的 publication_date
.
问题
遗憾的是,当 cron 触发 DetectDuplicate
处理器时,只有一条消息被消耗,其他消息留在队列中。
如果我将 "Scheduling strategy" 更改为 "Timer driven" 并且 "Run schedule" 为 0,我的所有流文件都将被消耗并且输出是预期的。
有没有办法让我的 DetectDuplicate
处理器在开始工作时消耗队列中的所有消息(而不是只有一条消息)?
或者有没有办法设置像"Start to work at 2:00 AM and stop at 4:00 AM"这样的调度策略?
您认为有更好的策略来满足需求吗?
此致,
价值
更新 1
(2018-04-13) 更多信息,以及 Bryan Bende 的评论。
我知道 CRON 不是最好的解决方案,但我不知道如何改进我的算法来摆脱它。
在我的例子中,排队等待重复数据删除的流文件是通过一系列 3 个 REST 调用生成的:
- 第一次调用 "GetAllCategories",
- 然后对于每个类别,调用 "GetSubCategories"、
- 并为每个子类别调用 "GetProducts"。
这个流文件生成部分通常持续 5 分钟左右:昨晚第一个流文件在 2:00:16 上午到达队列,最后一个在 2:04:58 上午到达队列。 (这就是为什么我将 DetectDuplicate
安排在 运行 上午 3:00。)
如果我的 DetectDuplicate
处理器将被 "Timer driven" 调度,则到达队列的第一个 flowFiles 将在所有 flowFiles 到达那里之前被处理器消耗。
这会破坏整套流文件的顺序。
我觉得在 DetectDuplicate
处理器开始工作之前我必须等待所有流文件都在队列中。
你有什么建议可以改进我的算法吗?
您通常应该为启动流程的源处理器使用 CRON 调度,然后所有其他处理器应该使用 运行 调度 0 的定时器驱动。
例如,如果您每天在 2:00 AM 从目录中获取文件,则应使用 CRON 表达式安排 GetFile 在 2:00 AM 开始流程,但不能超过需要 CRON 调度,因为除非 GetFile 运行,否则它们永远不会接收数据。
在您希望处理器等待所有流文件可用之前执行的情况下,您可以使用 Wait/Notify 处理器,这样所有流文件都会在在被释放到 DetectDuplicate 处理器之前等待处理器。
只有一条消息被消耗的原因是当您在所有处理器中启用了 CRON
调度时 - 源和 consuming/dowstream 处理器,它执行如下:
Ex: 您已经在所有处理器中设置了每天下午 2 点 运行 的 CRON 计划,因此在触发期间它将使用其上游处理器 ex: GetFile
在下午 2 点,其余流文件将在队列中,下一个流文件只会在第二天下午 2 点被消耗,依此类推。这适用于更下游的处理器,这意味着,它们也将在每天下午 2 点一次只消耗流文件,这本质上是一场正在酝酿中的灾难。谁希望处理速度缓慢?
这就是您必须遵循@Bryan 提到的方法的原因。流管道应该只有它的源处理器 CRON driven
,其余的处理器应该是 Timer driven
和你希望的 运行 时间表,但通常 0 sec
用于使用流文件。
在 NiFi 中,我有一个 cron 驱动的处理器序列,每天提供一组流文件,其中包含我感兴趣的 2 个属性:product_code
和 publication_date
。
我的需要是每个 product_code
只保留一个流文件:最新的 publication_date
.
例如:
对于此输入:
flow_1: product_code: A / publication_date : 2018-01-01
flow_2: product_code: B / publication_date : 2018-01-01
flow_3: product_code: C / publication_date : 2018-01-01
flow_4: product_code: A / publication_date : 2018-04-12
flow_5: product_code: A / publication_date : 2000-12-31
flow_6: product_code: B / publication_date : 2018-02-02
flow_7: product_code: B / publication_date : 2018-03-03
预期的输出应该是:
flow_3: product_code: C / publication_date : 2018-01-01
flow_4: product_code: A / publication_date : 2018-04-12
flow_7: product_code: B / publication_date : 2018-03-03
我测试的算法
- 使用
UpdateAttribute
处理器根据publication_date
. 向每个流文件添加属性 - 这些更新的流文件被重定向到
PriorityAttributePrioritizer
队列。 - 流文件留在这个队列中,因为只有一个消耗处理器,它是 cron 驱动的。通过这种方式,我确定队列中的流文件是根据
publication_date
. 排序的
- 然后 CRON 触发下一个处理器,一个基于
product_code
属性的DetectDuplicate
。由于流文件是从最近的项目到最旧的项目处理的,我确信当product_code
被检测为重复时,这是因为相同的product_code
已经可以使用更新的publication_date
.
priority
问题
遗憾的是,当 cron 触发 DetectDuplicate
处理器时,只有一条消息被消耗,其他消息留在队列中。
如果我将 "Scheduling strategy" 更改为 "Timer driven" 并且 "Run schedule" 为 0,我的所有流文件都将被消耗并且输出是预期的。
有没有办法让我的 DetectDuplicate
处理器在开始工作时消耗队列中的所有消息(而不是只有一条消息)?
或者有没有办法设置像"Start to work at 2:00 AM and stop at 4:00 AM"这样的调度策略?
您认为有更好的策略来满足需求吗?
此致,
价值
更新 1
(2018-04-13) 更多信息,以及 Bryan Bende 的评论。
我知道 CRON 不是最好的解决方案,但我不知道如何改进我的算法来摆脱它。
在我的例子中,排队等待重复数据删除的流文件是通过一系列 3 个 REST 调用生成的:
- 第一次调用 "GetAllCategories",
- 然后对于每个类别,调用 "GetSubCategories"、
- 并为每个子类别调用 "GetProducts"。
这个流文件生成部分通常持续 5 分钟左右:昨晚第一个流文件在 2:00:16 上午到达队列,最后一个在 2:04:58 上午到达队列。 (这就是为什么我将 DetectDuplicate
安排在 运行 上午 3:00。)
如果我的 DetectDuplicate
处理器将被 "Timer driven" 调度,则到达队列的第一个 flowFiles 将在所有 flowFiles 到达那里之前被处理器消耗。
这会破坏整套流文件的顺序。
我觉得在 DetectDuplicate
处理器开始工作之前我必须等待所有流文件都在队列中。
你有什么建议可以改进我的算法吗?
您通常应该为启动流程的源处理器使用 CRON 调度,然后所有其他处理器应该使用 运行 调度 0 的定时器驱动。
例如,如果您每天在 2:00 AM 从目录中获取文件,则应使用 CRON 表达式安排 GetFile 在 2:00 AM 开始流程,但不能超过需要 CRON 调度,因为除非 GetFile 运行,否则它们永远不会接收数据。
在您希望处理器等待所有流文件可用之前执行的情况下,您可以使用 Wait/Notify 处理器,这样所有流文件都会在在被释放到 DetectDuplicate 处理器之前等待处理器。
只有一条消息被消耗的原因是当您在所有处理器中启用了 CRON
调度时 - 源和 consuming/dowstream 处理器,它执行如下:
Ex: 您已经在所有处理器中设置了每天下午 2 点 运行 的 CRON 计划,因此在触发期间它将使用其上游处理器 ex: GetFile
在下午 2 点,其余流文件将在队列中,下一个流文件只会在第二天下午 2 点被消耗,依此类推。这适用于更下游的处理器,这意味着,它们也将在每天下午 2 点一次只消耗流文件,这本质上是一场正在酝酿中的灾难。谁希望处理速度缓慢?
这就是您必须遵循@Bryan 提到的方法的原因。流管道应该只有它的源处理器 CRON driven
,其余的处理器应该是 Timer driven
和你希望的 运行 时间表,但通常 0 sec
用于使用流文件。