NiFi - 更新处理器中的 Luwak (Lucene) 索引

NiFi - Update Luwak (Lucene) Indexes in Processor

我正在尝试使用 Luwak Lucene 索引器创建自定义处理器,因此我可以 运行 查询传入的流文件。我想弄清楚的是更新 Luwak 监视器中存在的查询索引的最佳方法(下面的示例代码)。

编辑 - 更多使用上下文

通过更新,我的意思是允许外部用户添加/更新/删除正在 运行 针对传入流文件的查询。我们将从一组固定的查询开始,然后希望允许一个或多个用户能够更改针对传入消息执行的查询。挑战就在这里,改变正在执行的查询。

我还应该考虑其他选择吗?如果有 10k,更新查询似乎需要大约 20 秒。这很可能很少见,但我正在考虑重新加载/启动时间。

我考虑过的选项:

  1. 使用 UpdateAttribute 并更新每个流文件。不理想,特别是如果有一堆查询要索引。
  2. 使用http、AWS SQS等方式发送一个高优先级的流文件进行更新(比其他任何来源都高)。不可怕,但似乎仍然不对。
  3. 使用 NiFi API 在更新时启动/停止处理器。执行更新似乎不是一种非常有效的方法,尤其是当它们发生得相当频繁时。

实例化监视器:

Monitor monitor = new Monitor(new LuceneQueryParser("field"), new TermFilteredPresearcher());

添加查询 - 我要优化的内容:

        //Add queries to the monitor
        for (Map.Entry<String, String> entry : bucketList.entrySet()) {
            MonitorQuery q = new MonitorQuery(entry.getKey(),entry.getValue());
            monitor.update(q);
        }

当您的处理器启动时,您可以启动一个后台计时器线程,该线程会定期构建一个新监视器,然后替换处理器正在使用的监视器。

您可能希望在您的处理器中创建一个成员变量,例如:

AtomicReference<Monitor> monitorHolder = new AtomicReference<Monitor>();

然后在@OnScheduled 中您可以构建初始监视器并将其设置在支架中。

然后在 onTrigger 中你总是首先得到 Monitor:

Monitor localMonitor = monitorHolder.get();

然后在后台线程中可以调用monitorHolder.set(newMonitor),不会影响当前处理器的执行,但会在下次调用onTrigger时生效。