带有 python 的 Nifi ExecuteScript:OOM 异常
Nifi ExecuteScript with python: OOM exception
一个触发处理器->GenerateTableFetch->ExecuteSQLRecord->UpdateAttribute->ExecuteScript(python)->....
如图
但是在 ExecuteScript 中出现 OutOfMemory 异常,我该怎么办?TIA
日志
2019-11-15 16:37:33,466 ERROR [pool-14-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Unable to checkpoint FlowFile Repository due to java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
脚本
import json
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self, flowfile):
self.ff = flowfile
pass
def process(self, inputStream, outputStream):
# prop_col_names = self.ff.getAttribute('prop_col_names')
prop_col_names = ['FACTORYNO']
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
obj = json.loads(text)
results_prop = []
results_basic = []
for row in obj:
main_id= row['COMBINEPICKUPNO']
# main_prop_table
for col_name in prop_col_names:
# TODO: read from variable
d = {}
if col_name == 'FACTORYNO':
d = {'VALUE': row[col_name],
'ID': col_name,
}
results_prop.append(d)
# TODO: read from variable
basic_info = {
'ID': main_id,
}
results_basic.append(basic_info)
results_pxp.append(pxp_info)
data = {
'prop': results_prop,
'basic': results_basic
}
outputStream.write(bytearray(json.dumps(data, separators=(',', ':')).encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, PyStreamCallback(flowFile))
session.transfer(flowFile, REL_SUCCESS)
Java 1.8
尼菲 1.9.2
如果您正在将大文件读入内存,daggett
是正确的,您可以 运行 内存不足。
或者,我相信 Jython 中存在已知的内存泄漏,但我找不到我正在寻找的页面。可以尝试在bootstrap.conf中将系统属性python.options.internalTablesImpl
设置为weak
并重启NiFi
作为另一种可能的选择,由于您的脚本不太长,您可以将代码移植到 Groovy,它有许多与 Python 相似的习语,所以希望不会太复杂.
一个触发处理器->GenerateTableFetch->ExecuteSQLRecord->UpdateAttribute->ExecuteScript(python)->....
如图
但是在 ExecuteScript 中出现 OutOfMemory 异常,我该怎么办?TIA
日志
2019-11-15 16:37:33,466 ERROR [pool-14-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Unable to checkpoint FlowFile Repository due to java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
脚本
import json
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self, flowfile):
self.ff = flowfile
pass
def process(self, inputStream, outputStream):
# prop_col_names = self.ff.getAttribute('prop_col_names')
prop_col_names = ['FACTORYNO']
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
obj = json.loads(text)
results_prop = []
results_basic = []
for row in obj:
main_id= row['COMBINEPICKUPNO']
# main_prop_table
for col_name in prop_col_names:
# TODO: read from variable
d = {}
if col_name == 'FACTORYNO':
d = {'VALUE': row[col_name],
'ID': col_name,
}
results_prop.append(d)
# TODO: read from variable
basic_info = {
'ID': main_id,
}
results_basic.append(basic_info)
results_pxp.append(pxp_info)
data = {
'prop': results_prop,
'basic': results_basic
}
outputStream.write(bytearray(json.dumps(data, separators=(',', ':')).encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, PyStreamCallback(flowFile))
session.transfer(flowFile, REL_SUCCESS)
Java 1.8 尼菲 1.9.2
如果您正在将大文件读入内存,daggett
是正确的,您可以 运行 内存不足。
或者,我相信 Jython 中存在已知的内存泄漏,但我找不到我正在寻找的页面。可以尝试在bootstrap.conf中将系统属性python.options.internalTablesImpl
设置为weak
并重启NiFi
作为另一种可能的选择,由于您的脚本不太长,您可以将代码移植到 Groovy,它有许多与 Python 相似的习语,所以希望不会太复杂.