如何根据Elastic中的数据生成N个FlowFile并设置每个FlowFile的内容?
How to generate N FlowFiles and set the content of each FlowFile according to the data in Elastic?
在 Elasticsearch 中我有这个索引和映射:
PUT /myindex
{
"mappings": {
"myentries": {
"_all": {
"enabled": false
},
"properties": {
"yid": {"type": "keyword"},
"days": {
"properties": {
"Type1": { "type": "date" },
"Type2": { "type": "date" }
}
},
"directions": {
"properties": {
"name": {"type": "keyword"},
"recorder": { "type": "keyword" },
"direction": { "type": "integer" }
}
}
}
}
}
}
我想生成 N 个流文件,映射 directions
中 recorder
和 direction
值的每个组合 1 个。我怎样才能在 Nifi 中做到这一点?我正在考虑使用 GenerateFlowFile
,但如何应用与 Elasticsearch 相关的逻辑?
一种可能的解决方法是使用 GenerateFlowFile
生成 N 个 FlowFiles,其中 Batch
字段可以被硬编码并设置为 10(Elastic 中的条目数)。但是我不知道下一步应该做什么?
GenerateFlowFile
可能不是这里的正确工具,因为它不接受传入连接,因此您无法使用计数对其进行参数化。您可以使用 SplitJson
,它会将流文件拆分为多个流文件,给定一个 JSONPath 表达式,return 是 [=37] 中的一个数组=] 内容。
更新
这是一个 great tool,您可以使用它来动态评估 JSONPath 并查看它匹配的内容。在您的示例中,假设您收到了如下数据:
{
"yid": "nifi",
"days" : [{"Type1": "09/07/2017"},{"Type2":"10/07/2017"}],
"directions": [
{
"name": "San Francisco",
"recorder" : "Samsung",
"direction": "0"
},
{
"name": "Santa Monica",
"recorder" : "iPhone",
"direction": "270"
},
{
"name": "San Diego",
"recorder" : "Razr",
"direction": "180"
},
{
"name": "Santa Clara",
"recorder" : "Android",
"direction": "0"
}
]
}
JSON路径表达式 $.directions[*].direction
将 return:
[
"0",
"270",
"180",
"0"
]
这将允许 SplitJson
使用派生内容和 fragment
属性创建四个流文件以将它们关联回原始流文件。
如果您确实需要对生成的方向和记录器值执行排列逻辑,您可能需要使用 ExecuteScript
和一个简单的 Groovy/Ruby/Python 脚本来内联执行该操作并拆分生成的结果值。
在 Elasticsearch 中我有这个索引和映射:
PUT /myindex
{
"mappings": {
"myentries": {
"_all": {
"enabled": false
},
"properties": {
"yid": {"type": "keyword"},
"days": {
"properties": {
"Type1": { "type": "date" },
"Type2": { "type": "date" }
}
},
"directions": {
"properties": {
"name": {"type": "keyword"},
"recorder": { "type": "keyword" },
"direction": { "type": "integer" }
}
}
}
}
}
}
我想生成 N 个流文件,映射 directions
中 recorder
和 direction
值的每个组合 1 个。我怎样才能在 Nifi 中做到这一点?我正在考虑使用 GenerateFlowFile
,但如何应用与 Elasticsearch 相关的逻辑?
一种可能的解决方法是使用 GenerateFlowFile
生成 N 个 FlowFiles,其中 Batch
字段可以被硬编码并设置为 10(Elastic 中的条目数)。但是我不知道下一步应该做什么?
GenerateFlowFile
可能不是这里的正确工具,因为它不接受传入连接,因此您无法使用计数对其进行参数化。您可以使用 SplitJson
,它会将流文件拆分为多个流文件,给定一个 JSONPath 表达式,return 是 [=37] 中的一个数组=] 内容。
更新
这是一个 great tool,您可以使用它来动态评估 JSONPath 并查看它匹配的内容。在您的示例中,假设您收到了如下数据:
{
"yid": "nifi",
"days" : [{"Type1": "09/07/2017"},{"Type2":"10/07/2017"}],
"directions": [
{
"name": "San Francisco",
"recorder" : "Samsung",
"direction": "0"
},
{
"name": "Santa Monica",
"recorder" : "iPhone",
"direction": "270"
},
{
"name": "San Diego",
"recorder" : "Razr",
"direction": "180"
},
{
"name": "Santa Clara",
"recorder" : "Android",
"direction": "0"
}
]
}
JSON路径表达式 $.directions[*].direction
将 return:
[
"0",
"270",
"180",
"0"
]
这将允许 SplitJson
使用派生内容和 fragment
属性创建四个流文件以将它们关联回原始流文件。
如果您确实需要对生成的方向和记录器值执行排列逻辑,您可能需要使用 ExecuteScript
和一个简单的 Groovy/Ruby/Python 脚本来内联执行该操作并拆分生成的结果值。