使用 Apache Nifi 阅读电子邮件的消息 body
Read message body of an email using Apache Nifi
是否可以使用 Apache Nifi 单步检索电子邮件的 body 内容、电子邮件 header 详细信息和电子邮件附件。
如果是,请帮助我如何实现。
除非您编写自己的处理器或脚本(使用 ExecuteScript 或 InvokeScriptedProcessor),否则单步执行是不可能的。但是,在单个流程中使用类似以下内容是可能的:
ConsumePOP3 -> ExtractEmailHeaders -> ExtractEmailAttachments -> ...
在上述流程结束时,每个附件都有一个流程文件,每个流程文件包含电子邮件 headers 作为属性,附件作为内容。
您可以使用处理器 "ExecuteScript",而不是开发自定义处理器。
import email
import mimetypes
from email.parser import Parser
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from java.io import BufferedReader, InputStreamReader
from org.apache.nifi.processors.script import ExecuteScript
from org.apache.nifi.processor.io import InputStreamCallback
from org.apache.nifi.processor.io import StreamCallback
class PyInputStreamCallback(InputStreamCallback):
_text = None
def __init__(self):
pass
def getText(self) :
return self._text
def process(self, inputStream):
self._text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
flowFile = session.get()
if flowFile is not None :
reader = PyInputStreamCallback()
session.read(flowFile, reader)
msg = email.message_from_string(reader.getText())
body = ""
if msg.is_multipart():
for part in msg.walk():
ctype = part.get_content_type()
cdispo = str(part.get('Content-Disposition'))
if ctype == 'text/plain' and 'attachment' not in cdispo:
body = part.get_payload(decode=True) # decode
break
else:
body = msg.get_payload(decode=True)
flowFile = session.putAttribute(flowFile, 'msgbody', body.decode('utf-8', 'ignore'))
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
截图
是否可以使用 Apache Nifi 单步检索电子邮件的 body 内容、电子邮件 header 详细信息和电子邮件附件。
如果是,请帮助我如何实现。
除非您编写自己的处理器或脚本(使用 ExecuteScript 或 InvokeScriptedProcessor),否则单步执行是不可能的。但是,在单个流程中使用类似以下内容是可能的:
ConsumePOP3 -> ExtractEmailHeaders -> ExtractEmailAttachments -> ...
在上述流程结束时,每个附件都有一个流程文件,每个流程文件包含电子邮件 headers 作为属性,附件作为内容。
您可以使用处理器 "ExecuteScript",而不是开发自定义处理器。
import email
import mimetypes
from email.parser import Parser
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from java.io import BufferedReader, InputStreamReader
from org.apache.nifi.processors.script import ExecuteScript
from org.apache.nifi.processor.io import InputStreamCallback
from org.apache.nifi.processor.io import StreamCallback
class PyInputStreamCallback(InputStreamCallback):
_text = None
def __init__(self):
pass
def getText(self) :
return self._text
def process(self, inputStream):
self._text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
flowFile = session.get()
if flowFile is not None :
reader = PyInputStreamCallback()
session.read(flowFile, reader)
msg = email.message_from_string(reader.getText())
body = ""
if msg.is_multipart():
for part in msg.walk():
ctype = part.get_content_type()
cdispo = str(part.get('Content-Disposition'))
if ctype == 'text/plain' and 'attachment' not in cdispo:
body = part.get_payload(decode=True) # decode
break
else:
body = msg.get_payload(decode=True)
flowFile = session.putAttribute(flowFile, 'msgbody', body.decode('utf-8', 'ignore'))
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
截图