访问 Json 元素并使用 python ExecuteScript 处理器写入文本文件

Access Json element and write to a text file using python ExecuteScript processor

我是 python 和 nifi 的新手。

我的流程是GetFile-->ExecuteScript

在脚本中,对于每个 json,我想访问一个特定的元素并将其逐行写入文本文件。

我尝试了以下方法:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class ModJSON(StreamCallback):
  def __init__(self):
    pass
  def process(self, inputStream, outputStream):
  text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  json_content = json.loads(text)
  try:
     body = json_content['id']['body']
     body_encoded = body.encode('utf-8')
  except (KeyError,TypeError,ValueError):
     body_encoded = ''

  text_file = open ('/tmp/test/testFile.txt', 'w')   
  text_file.write("%s"%body_encoded)
  text_file.close()
  outputStream.write(bytearray(json.dumps(body, indent=4).encode('utf-8')))

flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile, ModJSON())
    flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)

但是在testFile.txt中,访问的正文没有被写入。

我在这里想念什么?

您的 Python class 的正文没有缩进,过程方法的正文也没有。尝试从 def init 行到 outputStream.write 行缩进一级,然后再次从 text = IOUtils.toString 行到 outputStream.write 缩进一级行,这应该给你一个工作 StreamCallback class 并使脚本正常工作。

此外,您不需要调用 session.commit(),脚本完成后会为您调用。

EDIT(由于 OP 编辑​​——见评论):上面的脚本仍然没有正确缩进,process() 方法的主体需要缩进。您是否在 ExecuteScript 处理器上收到错误或公告?如果传入的流文件在 ExecuteScript 之前排队,那么 "flowFile = session.get()" 不会被执行,或者处理器应该抛出错误并发布公告(右上角的红色框)。

此外,由于您打算在流文件中将相同的内容发送出处理器,因此您不需要 "text_file" 代码,我认为这是为了调试?