NiFi - 更新处理器中的 Luwak (Lucene) 索引
NiFi - Update Luwak (Lucene) Indexes in Processor
我正在尝试使用 Luwak Lucene 索引器创建自定义处理器,因此我可以 运行 查询传入的流文件。我想弄清楚的是更新 Luwak 监视器中存在的查询索引的最佳方法(下面的示例代码)。
编辑 - 更多使用上下文
通过更新,我的意思是允许外部用户添加/更新/删除正在 运行 针对传入流文件的查询。我们将从一组固定的查询开始,然后希望允许一个或多个用户能够更改针对传入消息执行的查询。挑战就在这里,改变正在执行的查询。
我还应该考虑其他选择吗?如果有 10k,更新查询似乎需要大约 20 秒。这很可能很少见,但我正在考虑重新加载/启动时间。
我考虑过的选项:
- 使用 UpdateAttribute 并更新每个流文件。不理想,特别是如果有一堆查询要索引。
- 使用http、AWS SQS等方式发送一个高优先级的流文件进行更新(比其他任何来源都高)。不可怕,但似乎仍然不对。
- 使用 NiFi API 在更新时启动/停止处理器。执行更新似乎不是一种非常有效的方法,尤其是当它们发生得相当频繁时。
实例化监视器:
Monitor monitor = new Monitor(new LuceneQueryParser("field"), new TermFilteredPresearcher());
添加查询 - 我要优化的内容:
//Add queries to the monitor
for (Map.Entry<String, String> entry : bucketList.entrySet()) {
MonitorQuery q = new MonitorQuery(entry.getKey(),entry.getValue());
monitor.update(q);
}
当您的处理器启动时,您可以启动一个后台计时器线程,该线程会定期构建一个新监视器,然后替换处理器正在使用的监视器。
您可能希望在您的处理器中创建一个成员变量,例如:
AtomicReference<Monitor> monitorHolder = new AtomicReference<Monitor>();
然后在@OnScheduled 中您可以构建初始监视器并将其设置在支架中。
然后在 onTrigger 中你总是首先得到 Monitor:
Monitor localMonitor = monitorHolder.get();
然后在后台线程中可以调用monitorHolder.set(newMonitor),不会影响当前处理器的执行,但会在下次调用onTrigger时生效。
我正在尝试使用 Luwak Lucene 索引器创建自定义处理器,因此我可以 运行 查询传入的流文件。我想弄清楚的是更新 Luwak 监视器中存在的查询索引的最佳方法(下面的示例代码)。
编辑 - 更多使用上下文
通过更新,我的意思是允许外部用户添加/更新/删除正在 运行 针对传入流文件的查询。我们将从一组固定的查询开始,然后希望允许一个或多个用户能够更改针对传入消息执行的查询。挑战就在这里,改变正在执行的查询。
我还应该考虑其他选择吗?如果有 10k,更新查询似乎需要大约 20 秒。这很可能很少见,但我正在考虑重新加载/启动时间。
我考虑过的选项:
- 使用 UpdateAttribute 并更新每个流文件。不理想,特别是如果有一堆查询要索引。
- 使用http、AWS SQS等方式发送一个高优先级的流文件进行更新(比其他任何来源都高)。不可怕,但似乎仍然不对。
- 使用 NiFi API 在更新时启动/停止处理器。执行更新似乎不是一种非常有效的方法,尤其是当它们发生得相当频繁时。
实例化监视器:
Monitor monitor = new Monitor(new LuceneQueryParser("field"), new TermFilteredPresearcher());
添加查询 - 我要优化的内容:
//Add queries to the monitor
for (Map.Entry<String, String> entry : bucketList.entrySet()) {
MonitorQuery q = new MonitorQuery(entry.getKey(),entry.getValue());
monitor.update(q);
}
当您的处理器启动时,您可以启动一个后台计时器线程,该线程会定期构建一个新监视器,然后替换处理器正在使用的监视器。
您可能希望在您的处理器中创建一个成员变量,例如:
AtomicReference<Monitor> monitorHolder = new AtomicReference<Monitor>();
然后在@OnScheduled 中您可以构建初始监视器并将其设置在支架中。
然后在 onTrigger 中你总是首先得到 Monitor:
Monitor localMonitor = monitorHolder.get();
然后在后台线程中可以调用monitorHolder.set(newMonitor),不会影响当前处理器的执行,但会在下次调用onTrigger时生效。