Python NiFi 中的 ExecuteScript:转换流文件属性和内容
Python ExecuteScript in NiFi: Transform flowfile attributes & content
我正在尝试在 NiFi 中创建一个 Python 脚本:
- 从传入的流文件中读取一些属性
- 读取流文件的 json 内容并提取特定字段
- 将属性写入传出流文件
- 用脚本中创建的新内容覆盖传入流文件(例如 API 调用 returns 新 json)并将其发送到 SUCCESS 关系或删除旧流文件并使用所需的内容创建新内容
到目前为止我做了什么:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback,InputStreamCallback, OutputStreamCallback
class OutputWrite(OutputStreamCallback, obj):
def __init__(self):
self.obj = obj
def process(self, outputStream):
outputStream.write(bytearray(json.dumps(self.obj).encode('utf')))
###end class###
flowfile = session.get()
if flowfile != None:
**#1) Get flowfile attributes**
headers = {
'Accept-Encoding': 'gzip, deflate, br',
'Accept': 'application/json, text/plain, */*',
'Cache-Control': 'no-cache',
'Ocp-Apim-Trace': 'true',
'Authorization': flowfile.getAttribute('Authorization')
}
collection = flowfile.getAttribute('collection')
dataset = flowfile.getAttribute('dataset')
**#2)Get flowfile content**
stream_content = session.read(flowfile)
text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
json_content = json.loads(text_content)
records = json_content['result']['count']
pages = records/10000
**#3) Write flowfile attributes**
flowfile = session.putAttribute(flowfile, 'collection', collection)
flowfile = session.putAttribute(flowfile, 'dataset', dataset)
**#API operations: output_json with desired data**
output_json = {some data}
**#4) Write final JSON data to output flowfile**
flowfile = session.write(flowfile, OutputWrite(output_json))
session.transfer(flowfile, REL_SUCCESS)
session.commit()
我的问题是我找不到一种方法来将对所需 output_json 对象的引用作为参数传递给 OutputStreamCallback class。关于如何解决这个问题或更好的方法的任何想法?
在这种情况下,在 class 的处理函数中执行所有 API 操作可能更容易,但是我如何访问处理函数中的传入流文件属性(需要会话或流文件对象)?
非常感谢任何帮助!
我在下面包含了示例 Python 代码,它允许自定义 PyStreamCallback
class 实现逻辑以从 [=19] 的流文件内容中转换 JSON =] 上的主题,但我鼓励您考虑使用 UpdateAttribute
和 EvaluateJSONPath
的本机处理器来执行相关活动,并且仅在特别需要执行 NiFi 不执行的任务时使用自定义代码开箱即用。
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
obj = json.loads(text)
newObj = {
"Range": 5,
"Rating": obj['rating']['primary']['value'],
"SecondaryRatings": {}
}
for key, value in obj['rating'].iteritems():
if key != "primary":
newObj['SecondaryRatings'][key] = {"Id": key, "Range": 5, "Value": value['value']}
outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile,PyStreamCallback())
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
更新:
要在回调中访问流文件的属性,只需将其作为参数传递给构造函数,将其存储为字段,然后在 process
方法中引用它。这是一个非常简单的示例,它将属性 my_attr
的值连接到传入的流文件内容并将其写回:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self, flowfile):
self.ff = flowfile
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
text += self.ff.getAttribute('my_attr')
outputStream.write(bytearray(text.encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile,PyStreamCallback(flowFile))
session.transfer(flowFile, REL_SUCCESS)
传入流文件:
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'lineageStartDate'
Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'fileSize'
Value: '30'
FlowFile Attribute Map Content
Key: 'filename'
Value: '1690494181462176'
Key: 'my_attr'
Value: 'This is an attribute value.'
Key: 'path'
Value: './'
Key: 'uuid'
Value: 'dc93b715-50a0-43ce-a4db-716bd9ec3205'
--------------------------------------------------
This is some flowfile content.
传出流文件:
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'lineageStartDate'
Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'fileSize'
Value: '57'
FlowFile Attribute Map Content
Key: 'filename'
Value: '1690494181462176'
Key: 'my_attr'
Value: 'This is an attribute value.'
Key: 'path'
Value: './'
Key: 'uuid'
Value: 'dc93b715-50a0-43ce-a4db-716bd9ec3205'
--------------------------------------------------
This is some flowfile content.This is an attribute value.
你可以试试这样的-
import json
import sys
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
class TransformCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
try:
# Read input FlowFile content
input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
input_obj = json.loads(input_text)
# Transform content
output_obj = input_obj #your input content
#perform Data tranformation on output_obj
# Write output content
output_text = json.dumps(outputJson)
outputStream.write(StringUtil.toBytes(output_text))
except:
traceback.print_exc(file=sys.stdout)
raise
flowFile = session.get()
if flowFile != None:
flowFile = session.write(flowFile, TransformCallback())
# Finish by transferring the FlowFile to an output relationship
session.transfer(flowFile, REL_SUCCESS)
我正在尝试在 NiFi 中创建一个 Python 脚本:
- 从传入的流文件中读取一些属性
- 读取流文件的 json 内容并提取特定字段
- 将属性写入传出流文件
- 用脚本中创建的新内容覆盖传入流文件(例如 API 调用 returns 新 json)并将其发送到 SUCCESS 关系或删除旧流文件并使用所需的内容创建新内容
到目前为止我做了什么:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback,InputStreamCallback, OutputStreamCallback
class OutputWrite(OutputStreamCallback, obj):
def __init__(self):
self.obj = obj
def process(self, outputStream):
outputStream.write(bytearray(json.dumps(self.obj).encode('utf')))
###end class###
flowfile = session.get()
if flowfile != None:
**#1) Get flowfile attributes**
headers = {
'Accept-Encoding': 'gzip, deflate, br',
'Accept': 'application/json, text/plain, */*',
'Cache-Control': 'no-cache',
'Ocp-Apim-Trace': 'true',
'Authorization': flowfile.getAttribute('Authorization')
}
collection = flowfile.getAttribute('collection')
dataset = flowfile.getAttribute('dataset')
**#2)Get flowfile content**
stream_content = session.read(flowfile)
text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
json_content = json.loads(text_content)
records = json_content['result']['count']
pages = records/10000
**#3) Write flowfile attributes**
flowfile = session.putAttribute(flowfile, 'collection', collection)
flowfile = session.putAttribute(flowfile, 'dataset', dataset)
**#API operations: output_json with desired data**
output_json = {some data}
**#4) Write final JSON data to output flowfile**
flowfile = session.write(flowfile, OutputWrite(output_json))
session.transfer(flowfile, REL_SUCCESS)
session.commit()
我的问题是我找不到一种方法来将对所需 output_json 对象的引用作为参数传递给 OutputStreamCallback class。关于如何解决这个问题或更好的方法的任何想法?
在这种情况下,在 class 的处理函数中执行所有 API 操作可能更容易,但是我如何访问处理函数中的传入流文件属性(需要会话或流文件对象)?
非常感谢任何帮助!
我在下面包含了示例 Python 代码,它允许自定义 PyStreamCallback
class 实现逻辑以从 [=19] 的流文件内容中转换 JSON =] 上的主题,但我鼓励您考虑使用 UpdateAttribute
和 EvaluateJSONPath
的本机处理器来执行相关活动,并且仅在特别需要执行 NiFi 不执行的任务时使用自定义代码开箱即用。
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
obj = json.loads(text)
newObj = {
"Range": 5,
"Rating": obj['rating']['primary']['value'],
"SecondaryRatings": {}
}
for key, value in obj['rating'].iteritems():
if key != "primary":
newObj['SecondaryRatings'][key] = {"Id": key, "Range": 5, "Value": value['value']}
outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile,PyStreamCallback())
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
更新:
要在回调中访问流文件的属性,只需将其作为参数传递给构造函数,将其存储为字段,然后在 process
方法中引用它。这是一个非常简单的示例,它将属性 my_attr
的值连接到传入的流文件内容并将其写回:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self, flowfile):
self.ff = flowfile
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
text += self.ff.getAttribute('my_attr')
outputStream.write(bytearray(text.encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile,PyStreamCallback(flowFile))
session.transfer(flowFile, REL_SUCCESS)
传入流文件:
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'lineageStartDate'
Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'fileSize'
Value: '30'
FlowFile Attribute Map Content
Key: 'filename'
Value: '1690494181462176'
Key: 'my_attr'
Value: 'This is an attribute value.'
Key: 'path'
Value: './'
Key: 'uuid'
Value: 'dc93b715-50a0-43ce-a4db-716bd9ec3205'
--------------------------------------------------
This is some flowfile content.
传出流文件:
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'lineageStartDate'
Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'fileSize'
Value: '57'
FlowFile Attribute Map Content
Key: 'filename'
Value: '1690494181462176'
Key: 'my_attr'
Value: 'This is an attribute value.'
Key: 'path'
Value: './'
Key: 'uuid'
Value: 'dc93b715-50a0-43ce-a4db-716bd9ec3205'
--------------------------------------------------
This is some flowfile content.This is an attribute value.
你可以试试这样的-
import json
import sys
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
class TransformCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
try:
# Read input FlowFile content
input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
input_obj = json.loads(input_text)
# Transform content
output_obj = input_obj #your input content
#perform Data tranformation on output_obj
# Write output content
output_text = json.dumps(outputJson)
outputStream.write(StringUtil.toBytes(output_text))
except:
traceback.print_exc(file=sys.stdout)
raise
flowFile = session.get()
if flowFile != None:
flowFile = session.write(flowFile, TransformCallback())
# Finish by transferring the FlowFile to an output relationship
session.transfer(flowFile, REL_SUCCESS)