访问 Json 元素并使用 python ExecuteScript 处理器写入文本文件
Access Json element and write to a text file using python ExecuteScript processor
我是 python 和 nifi 的新手。
我的流程是GetFile-->ExecuteScript
在脚本中,对于每个 json,我想访问一个特定的元素并将其逐行写入文本文件。
我尝试了以下方法:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class ModJSON(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
json_content = json.loads(text)
try:
body = json_content['id']['body']
body_encoded = body.encode('utf-8')
except (KeyError,TypeError,ValueError):
body_encoded = ''
text_file = open ('/tmp/test/testFile.txt', 'w')
text_file.write("%s"%body_encoded)
text_file.close()
outputStream.write(bytearray(json.dumps(body, indent=4).encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, ModJSON())
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
但是在testFile.txt中,访问的正文没有被写入。
我在这里想念什么?
您的 Python class 的正文没有缩进,过程方法的正文也没有。尝试从 def init 行到 outputStream.write 行缩进一级,然后再次从 text = IOUtils.toString 行到 outputStream.write 缩进一级行,这应该给你一个工作 StreamCallback class 并使脚本正常工作。
此外,您不需要调用 session.commit(),脚本完成后会为您调用。
EDIT(由于 OP 编辑——见评论):上面的脚本仍然没有正确缩进,process() 方法的主体需要缩进。您是否在 ExecuteScript 处理器上收到错误或公告?如果传入的流文件在 ExecuteScript 之前排队,那么 "flowFile = session.get()" 不会被执行,或者处理器应该抛出错误并发布公告(右上角的红色框)。
此外,由于您打算在流文件中将相同的内容发送出处理器,因此您不需要 "text_file" 代码,我认为这是为了调试?
我是 python 和 nifi 的新手。
我的流程是GetFile-->ExecuteScript
在脚本中,对于每个 json,我想访问一个特定的元素并将其逐行写入文本文件。
我尝试了以下方法:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class ModJSON(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
json_content = json.loads(text)
try:
body = json_content['id']['body']
body_encoded = body.encode('utf-8')
except (KeyError,TypeError,ValueError):
body_encoded = ''
text_file = open ('/tmp/test/testFile.txt', 'w')
text_file.write("%s"%body_encoded)
text_file.close()
outputStream.write(bytearray(json.dumps(body, indent=4).encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, ModJSON())
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
但是在testFile.txt中,访问的正文没有被写入。
我在这里想念什么?
您的 Python class 的正文没有缩进,过程方法的正文也没有。尝试从 def init 行到 outputStream.write 行缩进一级,然后再次从 text = IOUtils.toString 行到 outputStream.write 缩进一级行,这应该给你一个工作 StreamCallback class 并使脚本正常工作。
此外,您不需要调用 session.commit(),脚本完成后会为您调用。
EDIT(由于 OP 编辑——见评论):上面的脚本仍然没有正确缩进,process() 方法的主体需要缩进。您是否在 ExecuteScript 处理器上收到错误或公告?如果传入的流文件在 ExecuteScript 之前排队,那么 "flowFile = session.get()" 不会被执行,或者处理器应该抛出错误并发布公告(右上角的红色框)。
此外,由于您打算在流文件中将相同的内容发送出处理器,因此您不需要 "text_file" 代码,我认为这是为了调试?