如何根据Elastic中的数据生成N个FlowFile并设置每个FlowFile的内容?

How to generate N FlowFiles and set the content of each FlowFile according to the data in Elastic?

在 Elasticsearch 中我有这个索引和映射:

PUT /myindex
{
  "mappings": {
    "myentries": {
      "_all": {
        "enabled": false
      }, 
      "properties": {
          "yid": {"type": "keyword"},
          "days": { 
              "properties": {
                        "Type1":  { "type": "date" },
                        "Type2":  { "type": "date" }
              }
            },
            "directions": { 
              "properties": {
                      "name": {"type": "keyword"},
                      "recorder":  { "type": "keyword" },
                      "direction":  { "type": "integer" }
              }
            }
        }
    }
  }
}

我想生成 N 个流文件,映射 directionsrecorderdirection 值的每个组合 1 个。我怎样才能在 Nifi 中做到这一点?我正在考虑使用 GenerateFlowFile,但如何应用与 Elasticsearch 相关的逻辑?

一种可能的解决方法是使用 GenerateFlowFile 生成 N 个 FlowFiles,其中 Batch 字段可以被硬编码并设置为 10(Elastic 中的条目数)。但是我不知道下一步应该做什么?

GenerateFlowFile 可能不是这里的正确工具,因为它不接受传入连接,因此您无法使用计数对其进行参数化。您可以使用 SplitJson,它会将流文件拆分为多个流文件,给定一个 JSONPath 表达式,return 是 [=37] 中的一个数组=] 内容。

更新

这是一个 great tool,您可以使用它来动态评估 JSONPath 并查看它匹配的内容。在您的示例中,假设您收到了如下数据:

{
  "yid": "nifi",
  "days" : [{"Type1": "09/07/2017"},{"Type2":"10/07/2017"}],
  "directions": [
    {
        "name": "San Francisco",
      "recorder"  : "Samsung",
      "direction": "0"
    },
    {
        "name": "Santa Monica",
      "recorder"  : "iPhone",
      "direction": "270"
    },
    {
        "name": "San Diego",
      "recorder"  : "Razr",
      "direction": "180"
    },
    {
        "name": "Santa Clara",
      "recorder"  : "Android",
      "direction": "0"
    }
  ]
}

JSON路径表达式 $.directions[*].direction 将 return:

[
  "0",
  "270",
  "180",
  "0"
]

这将允许 SplitJson 使用派生内容和 fragment 属性创建四个流文件以将它们关联回原始流文件。

如果您确实需要对生成的方向和记录器值执行排列逻辑,您可能需要使用 ExecuteScript 和一个简单的 Groovy/Ruby/Python 脚本来内联执行该操作并拆分生成的结果值。