如何正确合并多个 FlowFile?
How to properly merge multiple FlowFile's?
我使用 MergeContent
1.3.0 来合并来自 2 个来源的 FlowFiles:1) 来自 ListenHTTP 和 2) 来自 QueryElasticsearchHTTP
.
问题在于合并结果是 JSON 字符串的列表。如何将它们转换为单个 JSON 字符串?
{"event-date":"2017-08-08T00:00:00"}{"event-date":"2017-02-23T00:00:00"}{"eid":1,"zid":1,"latitude":38.3,"longitude":2.4}
我会得到这个结果:
{"event-date":["2017-08-08T00:00:00","2017-02-23T00:00:00"],"eid":1,"zid":1,"latitude":38.3,"longitude":2.4}
可能吗?
更新:
在 Elastic 中更改数据结构后,我能够得出以下 MergeContent
的输出结果。现在我在两个 JSON 字符串中都有一个公共字段 eid
。我想通过 eid
合并这些字符串以获得单个 JSON 文件。我应该使用哪个运算符?
{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}
我需要得到以下输出:
{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4,"dates":{"event-date":["2017-08-08","2017-02-23"]}}
建议使用ExecuteScript
合并文件。但是我不知道该怎么做。这是我试过的:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class ModJSON(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
obj = json.loads(text)
newObj = {
"eid": obj['eid'],
"zid": obj['zid'],
...
}
outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8')))
flowFile1 = session.get()
flowFile2 = session.get()
if (flowFile1 != None && flowFile2 != None):
# WHAT SHOULD I PUT HERE??
flowFile = session.write(flowFile, ModJSON())
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
session.commit()
将两种不同类型的数据连接在一起并不是 MergeContent 的真正用途。
您需要编写一个自定义处理器或自定义脚本来理解您传入的数据格式并创建新的输出。
如果您已将 ListenHttp 连接到 QueryElasticSearchHttp,这意味着您正在根据来自 ListenHttp 的流文件触发查询,那么您可能想要制作一个自定义版本的 QueryElasticSearchHttp 以获取传入流文件的内容并将其与任何传出结果结合在一起。
这是查询结果当前写入流文件的位置:
另一种选择是使用 ExecuteScript 并编写一个脚本,该脚本可以获取多个流文件并按照您描述的方式将它们合并在一起。
示例如何使用过滤从传入队列中读取多个文件
假设您有多对具有以下内容的流文件:
{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}
和
{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}
eid
字段的相同值在对之间提供 link。
在合并之前,我们必须提取eid
字段的值,并将其放入流文件的na属性中以进行快速过滤。
使用具有属性的 EvaluateJsonPath
处理器:
Destination : flowfile-attribute
eid : $.eid
在此之后,您将拥有流程文件的新 eid
属性。
然后使用带有 groovy 语言和以下代码的 ExecuteScript 处理器:
import org.apache.nifi.processor.FlowFileFilter;
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder
//get first flow file
def ff0 = session.get()
if(!ff0)return
def eid = ff0.getAttribute('eid')
//try to find files with same attribute in the incoming queue
def ffList = session.get(new FlowFileFilter(){
public FlowFileFilterResult filter(FlowFile ff) {
if( eid == ff.getAttribute('eid') )return FlowFileFilterResult.ACCEPT_AND_CONTINUE
return FlowFileFilterResult.REJECT_AND_CONTINUE
}
})
//let's assume you require two additional files in queue with the same attribute
if( !ffList || ffList.size()<1 ){
//if less than required
//rollback current session with penalize retrieved files so they will go to the end of the incoming queue
//with pre-configured penalty delay (default 30sec)
session.rollback(true)
return
}
//let's put all in one list to simplify later iterations
ffList.add(ff0)
if( ffList.size()>2 ){
//for example unexpected situation. you have more files then expected
//redirect all of them to failure
session.transfer(ffList, REL_FAILURE)
return
}
//create empty map (aka json object)
def json = [:]
//iterate through files parse and merge attributes
ffList.each{ff->
session.read(ff).withStream{rawIn->
def fjson = new JsonSlurper().parse(rawIn)
json.putAll(fjson)
}
}
//create new flow file and write merged json as a content
def ffOut = session.create()
ffOut = session.write(ffOut,{rawOut->
rawOut.withWriter("UTF-8"){writer->
new JsonBuilder(json).writeTo(writer)
}
} as OutputStreamCallback )
//set mime-type
ffOut = session.putAttribute(ffOut, "mime.type", "application/json")
session.remove(ffList)
session.transfer(ffOut, REL_SUCCESS)
我使用 MergeContent
1.3.0 来合并来自 2 个来源的 FlowFiles:1) 来自 ListenHTTP 和 2) 来自 QueryElasticsearchHTTP
.
问题在于合并结果是 JSON 字符串的列表。如何将它们转换为单个 JSON 字符串?
{"event-date":"2017-08-08T00:00:00"}{"event-date":"2017-02-23T00:00:00"}{"eid":1,"zid":1,"latitude":38.3,"longitude":2.4}
我会得到这个结果:
{"event-date":["2017-08-08T00:00:00","2017-02-23T00:00:00"],"eid":1,"zid":1,"latitude":38.3,"longitude":2.4}
可能吗?
更新:
在 Elastic 中更改数据结构后,我能够得出以下 MergeContent
的输出结果。现在我在两个 JSON 字符串中都有一个公共字段 eid
。我想通过 eid
合并这些字符串以获得单个 JSON 文件。我应该使用哪个运算符?
{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}
我需要得到以下输出:
{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4,"dates":{"event-date":["2017-08-08","2017-02-23"]}}
建议使用ExecuteScript
合并文件。但是我不知道该怎么做。这是我试过的:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class ModJSON(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
obj = json.loads(text)
newObj = {
"eid": obj['eid'],
"zid": obj['zid'],
...
}
outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8')))
flowFile1 = session.get()
flowFile2 = session.get()
if (flowFile1 != None && flowFile2 != None):
# WHAT SHOULD I PUT HERE??
flowFile = session.write(flowFile, ModJSON())
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
session.commit()
将两种不同类型的数据连接在一起并不是 MergeContent 的真正用途。
您需要编写一个自定义处理器或自定义脚本来理解您传入的数据格式并创建新的输出。
如果您已将 ListenHttp 连接到 QueryElasticSearchHttp,这意味着您正在根据来自 ListenHttp 的流文件触发查询,那么您可能想要制作一个自定义版本的 QueryElasticSearchHttp 以获取传入流文件的内容并将其与任何传出结果结合在一起。
这是查询结果当前写入流文件的位置:
另一种选择是使用 ExecuteScript 并编写一个脚本,该脚本可以获取多个流文件并按照您描述的方式将它们合并在一起。
示例如何使用过滤从传入队列中读取多个文件
假设您有多对具有以下内容的流文件:
{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}
和
{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}
eid
字段的相同值在对之间提供 link。
在合并之前,我们必须提取eid
字段的值,并将其放入流文件的na属性中以进行快速过滤。
使用具有属性的 EvaluateJsonPath
处理器:
Destination : flowfile-attribute
eid : $.eid
在此之后,您将拥有流程文件的新 eid
属性。
然后使用带有 groovy 语言和以下代码的 ExecuteScript 处理器:
import org.apache.nifi.processor.FlowFileFilter;
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder
//get first flow file
def ff0 = session.get()
if(!ff0)return
def eid = ff0.getAttribute('eid')
//try to find files with same attribute in the incoming queue
def ffList = session.get(new FlowFileFilter(){
public FlowFileFilterResult filter(FlowFile ff) {
if( eid == ff.getAttribute('eid') )return FlowFileFilterResult.ACCEPT_AND_CONTINUE
return FlowFileFilterResult.REJECT_AND_CONTINUE
}
})
//let's assume you require two additional files in queue with the same attribute
if( !ffList || ffList.size()<1 ){
//if less than required
//rollback current session with penalize retrieved files so they will go to the end of the incoming queue
//with pre-configured penalty delay (default 30sec)
session.rollback(true)
return
}
//let's put all in one list to simplify later iterations
ffList.add(ff0)
if( ffList.size()>2 ){
//for example unexpected situation. you have more files then expected
//redirect all of them to failure
session.transfer(ffList, REL_FAILURE)
return
}
//create empty map (aka json object)
def json = [:]
//iterate through files parse and merge attributes
ffList.each{ff->
session.read(ff).withStream{rawIn->
def fjson = new JsonSlurper().parse(rawIn)
json.putAll(fjson)
}
}
//create new flow file and write merged json as a content
def ffOut = session.create()
ffOut = session.write(ffOut,{rawOut->
rawOut.withWriter("UTF-8"){writer->
new JsonBuilder(json).writeTo(writer)
}
} as OutputStreamCallback )
//set mime-type
ffOut = session.putAttribute(ffOut, "mime.type", "application/json")
session.remove(ffList)
session.transfer(ffOut, REL_SUCCESS)