使用 NiFi 拉取 Elasticsearch 索引

Using NiFi to pull Elasticsearch Indexes

我正在使用 NiFi(最近开始使用它,它似乎适合我的需要)。我们最近建立了一个 Spark/Hadoop 集群,并且已经使用 Elasticsearch 大约 2 年了。我的目标是将特定索引从 Elasticsearch 获取到 HDFS(特别是系统日志)。我正在做一个用于异常检测的机器学习项目,但想处理来自 HDFS 的数据以加快速度。

那么,一些背景知识 - 我们的系统日志索引每天都不同(logstash-syslog-2017-11-20,等等)。我只需要系统日志中的消息,所以基本上我想做的是:

ES -> NiFi -> Parse JSON to give me back text -> write each message to its own line in a text file. 

最后,在我的 HDFS 中,每个索引(天)都有消息的文本文件,例如:

syslog-2017-11-19
syslog-2017-11-20
syslog-2017-11-21

等等....

我被几件事难住了:

  1. 构建它需要哪些组件?我看到有 GenerateFlowFile,我认为我需要使索引名称动态化。

  2. 由于要拉取整个索引,我想我需要使用"ScrollElasticSearchHttp",但我不确定。还有其他选择,但我不知道什么是最好的。使用 PySpark 时,我使用 ES-Hadoop 连接器完成了简单的查询以获取整个索引,但不得不将滚动大小增加到 10k,以使其更快 运行。只是对我应该使用什么处理器感到困惑。

如果有人能告诉我这个结构(什么处理器、连接器等),我需要从系统日志中获取消息索引,从 ES 到我的 HDFS,那就太好了。还在学习这个,所以请原谅我对此的无知。非常感谢您的帮助!

还有 ListenBeats 个处理器。您可以将 Logstash 重定向到 NiFi,Nifi 可以同时写入 EL 和 HDF。确实,这会将 NiFi 置于您的关键路径上。

也可以编写您自己的处理器,而且您可以很容易地做到这一点。关注this article

我最近也发现了Nifi,我觉得很棒。玩了一下,因此我不是专家。

我发布了我的第一条评论作为答案,因为这最终成为了我的解决方案。

根据我上面的评论,我最终使用了 ScrollElasticsearchHttp 处理器,似乎我的某些选项格式不正确。一旦我得到正确的格式,它就起作用了。我希望 NiFi 文档有更多示例/明确示例来显示格式并将其与 ES-Hadoop 格式化选项区分开来。无论如何,现在一切正常。我有兴趣研究编写我自己的处理器 - 是否有相关指南或相关内容?