Nifi 在 ExecuteScript 中出错
Nifi getting error in ExecuteScript
我一直在尝试从 nifi 的 ExecuteScript 处理器中提取数据并将其作为属性附加到流文件中。我尝试了很多来源,尤其是 Matt Burgess 的 funnifi 博客中的那个。
以下是我的代码
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
originalFlowFile = session.get()
text = IOUtils.toString(originalFlowFile)
log.info(text)
if(originalFlowFile != None):
event = json.loads(text)
if (event['true'] == 'Y'):
flowfile = session.putAttribute(flowfile, "true", "Y")
elif (event['src'] == 'ONE' ):
allAttributes = { "true": "N", "src": "ONE" }
flowfile = session.putAllAttributes(flowfile, allAttributes)
elif (event['src'] == 'TWO' ):
allAttributes = { "true": "N", "src": "TWO" }
flowfile = session.putAllAttributes(flowfile, allAttributes)
session.transfer(flowFile, REL_SUCCESS)
session.commit()
这在 python 中。流文件数据是 JSON。尽管如此,我还是无法解析它。
INFO部分是
的输出
text = IOUtils.toString(originalFlowFile)
如有任何帮助,我们将不胜感激。
p.s。我不熟悉 python
测试数据
{
"true":"N",
"src":"ONE",
"var1":"value1",
"var2":"value2"
}
更新
我更新的代码,仍然不起作用:
import json
import java.io
from org.apache.commons.io import IOUtils
originalFlowFile = session.get()
if(originalFlowFile != None):
inputStream = session.read(originalFlowFile)
text = IOUtils.toString(inputStream)
log.info(text)
event = json.loads(text)
if (event['true'] == 'Y'):
flowfile = session.putAttribute(flowfile, "true", "Y")
elif (event['src'] == 'ONE' ):
allAttributes = { "true": "N", "src": "ONE" }
flowfile = session.putAllAttributes(flowfile, allAttributes)
elif (event['src'] == 'TWO' ):
allAttributes = { "true": "N", "src": "TWO" }
flowfile = session.putAllAttributes(flowfile, allAttributes)
session.transfer(flowFile, REL_SUCCESS)
session.commit()
在流文件本身上调用 IOUtils.toString() 可能不会起作用,因为流文件不是 InputStream 或 Reader,也不是可以自行读取的内容。我真的很惊讶该行没有产生异常。
有两种方法可以获取流文件的内容...
第一种是从会话中获取流文件的 InputStream:
originalFlowFile = session.get();
inputStream = session.read(originalFlowFile);
text = IOUtils.toString(inputStream);
第二种是使用 InputStreamCallback:
flowFile = session.read(flowFile, {inputStream ->
// read the inputStream
} as InputStreamCallback);
我一直在尝试从 nifi 的 ExecuteScript 处理器中提取数据并将其作为属性附加到流文件中。我尝试了很多来源,尤其是 Matt Burgess 的 funnifi 博客中的那个。
以下是我的代码
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
originalFlowFile = session.get()
text = IOUtils.toString(originalFlowFile)
log.info(text)
if(originalFlowFile != None):
event = json.loads(text)
if (event['true'] == 'Y'):
flowfile = session.putAttribute(flowfile, "true", "Y")
elif (event['src'] == 'ONE' ):
allAttributes = { "true": "N", "src": "ONE" }
flowfile = session.putAllAttributes(flowfile, allAttributes)
elif (event['src'] == 'TWO' ):
allAttributes = { "true": "N", "src": "TWO" }
flowfile = session.putAllAttributes(flowfile, allAttributes)
session.transfer(flowFile, REL_SUCCESS)
session.commit()
这在 python 中。流文件数据是 JSON。尽管如此,我还是无法解析它。
INFO部分是
的输出text = IOUtils.toString(originalFlowFile)
如有任何帮助,我们将不胜感激。
p.s。我不熟悉 python
测试数据
{
"true":"N",
"src":"ONE",
"var1":"value1",
"var2":"value2"
}
更新
我更新的代码,仍然不起作用:
import json
import java.io
from org.apache.commons.io import IOUtils
originalFlowFile = session.get()
if(originalFlowFile != None):
inputStream = session.read(originalFlowFile)
text = IOUtils.toString(inputStream)
log.info(text)
event = json.loads(text)
if (event['true'] == 'Y'):
flowfile = session.putAttribute(flowfile, "true", "Y")
elif (event['src'] == 'ONE' ):
allAttributes = { "true": "N", "src": "ONE" }
flowfile = session.putAllAttributes(flowfile, allAttributes)
elif (event['src'] == 'TWO' ):
allAttributes = { "true": "N", "src": "TWO" }
flowfile = session.putAllAttributes(flowfile, allAttributes)
session.transfer(flowFile, REL_SUCCESS)
session.commit()
在流文件本身上调用 IOUtils.toString() 可能不会起作用,因为流文件不是 InputStream 或 Reader,也不是可以自行读取的内容。我真的很惊讶该行没有产生异常。
有两种方法可以获取流文件的内容...
第一种是从会话中获取流文件的 InputStream:
originalFlowFile = session.get();
inputStream = session.read(originalFlowFile);
text = IOUtils.toString(inputStream);
第二种是使用 InputStreamCallback:
flowFile = session.read(flowFile, {inputStream ->
// read the inputStream
} as InputStreamCallback);