如何在 CRON 驱动的 DetectDuplicate 中摄取所有流文件?

How to ingest all the flowfiles in a CRON driven DetectDuplicate?

在 NiFi 中,我有一个 cron 驱动的处理器序列,每天提供一组流文件,其中包含我感兴趣的 2 个属性:product_codepublication_date

我的需要是每个 product_code 只保留一个流文件:最新的 publication_date.

例如:

对于此输入:

flow_1: product_code: A / publication_date : 2018-01-01
flow_2: product_code: B / publication_date : 2018-01-01
flow_3: product_code: C / publication_date : 2018-01-01
flow_4: product_code: A / publication_date : 2018-04-12
flow_5: product_code: A / publication_date : 2000-12-31
flow_6: product_code: B / publication_date : 2018-02-02
flow_7: product_code: B / publication_date : 2018-03-03

预期的输出应该是:

flow_3: product_code: C / publication_date : 2018-01-01
flow_4: product_code: A / publication_date : 2018-04-12
flow_7: product_code: B / publication_date : 2018-03-03

我测试的算法

  1. 使用 UpdateAttribute 处理器根据 publication_date.
  2. 向每个流文件添加属性 priority
  3. 这些更新的流文件被重定向到 PriorityAttributePrioritizer 队列。
  4. 流文件留在这个队列中,因为只有一个消耗处理器,它是 cron 驱动的。通过这种方式,我确定队列中的流文件是根据 publication_date.
  5. 排序的
  6. 然后 CRON 触发下一个处理器,一个基于 product_code 属性的 DetectDuplicate。由于流文件是从最近的项目到最旧的项目处理的,我确信当 product_code 被检测为重复时,这是因为相同的 product_code 已经可以使用更新的 publication_date.

问题

遗憾的是,当 cron 触发 DetectDuplicate 处理器时,只有一条消息被消耗,其他消息留在队列中。

如果我将 "Scheduling strategy" 更改为 "Timer driven" 并且 "Run schedule" 为 0,我的所有流文件都将被消耗并且输出是预期的。

有没有办法让我的 DetectDuplicate 处理器在开始工作时消耗队列中的所有消息(而不是只有一条消息)?

或者有没有办法设置像"Start to work at 2:00 AM and stop at 4:00 AM"这样的调度策略?

您认为有更好的策略来满足需求吗?

此致,

价值


更新 1

(2018-04-13) 更多信息,以及 Bryan Bende 的评论。

我知道 CRON 不是最好的解决方案,但我不知道如何改进我的算法来摆脱它。

在我的例子中,排队等待重复数据删除的流文件是通过一系列 3 个 REST 调用生成的:

这个流文件生成部分通常持续 5 分钟左右:昨晚第一个流文件在 2:00:16 上午到达队列,最后一个在 2:04:58 上午到达队列。 (这就是为什么我将 DetectDuplicate 安排在 运行 上午 3:00。)

如果我的 DetectDuplicate 处理器将被 "Timer driven" 调度,则到达队列的第一个 flowFiles 将在所有 flowFiles 到达那里之前被处理器消耗。

这会破坏整套流文件的顺序。

我觉得在 DetectDuplicate 处理器开始工作之前我必须等待所有流文件都在队列中。

你有什么建议可以改进我的算法吗?

您通常应该为启动流程的源处理器使用 CRON 调度,然后所有其他处理器应该使用 运行 调度 0 的定时器驱动。

例如,如果您每天在 2:00 AM 从目录中获取文件,则应使用 CRON 表达式安排 GetFile 在 2:00 AM 开始流程,但不能超过需要 CRON 调度,因为除非 GetFile 运行,否则它们永远不会接收数据。

在您希望处理器等待所有流文件可用之前执行的情况下,您可以使用 Wait/Notify 处理器,这样所有流文件都会在在被释放到 DetectDuplicate 处理器之前等待处理器。

只有一条消息被消耗的原因是当您在所有处理器中启用了 CRON 调度时 - 源和 consuming/dowstream 处理器,它执行如下:

Ex: 您已经在所有处理器中设置了每天下午 2 点 运行 的 CRON 计划,因此在触发期间它将使用其上游处理器 ex: GetFile 在下午 2 点,其余流文件将在队列中,下一个流文件只会在第二天下午 2 点被消耗,依此类推。这适用于更下游的处理器,这意味着,它们也将在每天下午 2 点一次只消耗流文件,这本质上是一场正在酝酿中的灾难。谁希望处理速度缓慢?

这就是您必须遵循@Bryan 提到的方法的原因。流管道应该只有它的源处理器 CRON driven,其余的处理器应该是 Timer driven 和你希望的 运行 时间表,但通常 0 sec 用于使用流文件。