完全流式传输 XML 解析器
Fully streaming XML parser
我正在尝试使用 Exchange GetAttachment webservice using requests, lxml and base64io。此服务 returns SOAP XML HTTP 响应中的 base64 编码文件。文件内容包含在单个 XML 元素的一行中。 GetAttachment
只是一个例子,但问题更普遍。
我想将解码后的文件内容直接流式传输到磁盘,而不会将附件的全部内容随时存储在内存中,因为一个附件可能有几百 MB。
我试过这样的事情:
r = requests.post('https://example.com/EWS/Exchange.asmx', data=..., stream=True)
with open('foo.txt', 'wb') as f:
for action, elem in lxml.etree.iterparse(GzipFile(fileobj=r.raw)):
if elem.tag == 't:Content':
b64_encoder = Base64IO(BytesIO(elem.text))
f.write(b64_encoder.read())
但 lxml
仍将附件副本存储为 elem.text
。有什么方法可以创建一个完全流式传输的 XML 解析器,它还可以直接从输入流中流式传输元素的内容吗?
在这种情况下不要使用 iterparse
。 iterparse()
方法只能发出元素开始和结束事件,因此当找到结束 XML 标记时,元素中的任何 text 都会提供给您。
相反,使用 SAX parser interface. This is a general standard for XML parsing libraries, to pass on parsed data to a content handler. The ContentHandler.characters()
callback 以块的形式传递字符数据(假设实现 XML 库实际上利用了这种可能性)。这是 ElementTree API 的较低级别 API,并且 Python 标准库已经捆绑了 Expat 解析器来驱动它。
所以流程变成:
- 将传入的请求流包装在
GzipFile
中以便于解压缩。或者,更好的是,设置 response.raw.decode_content = True
并根据服务器设置的内容编码将解压留给请求库。
- 将
GzipFile
实例或原始流传递给 .parse()
method of a parser created with xml.sax.make_parser()
。然后解析器继续以块的形式从流中读取。通过使用 make_parser()
,您首先可以启用命名空间处理等功能(如果 Exchange 决定更改用于每个命名空间的短前缀,这将确保您的代码不会中断)。
- 使用 XML 数据块调用内容处理程序
characters()
方法;检查正确的元素启动事件,以便您知道何时需要 base64 数据。您可以一次解码 chunks of (a multiple of) 4 characters 中的 base64 数据,并将其写入文件。我不会在这里使用 base64io
,只是做你自己的分块。
一个简单的内容处理器可以是:
from xml.sax import handler
from base64 import b64decode
class AttachmentContentHandler(handler.ContentHandler):
types_ns = 'http://schemas.microsoft.com/exchange/services/2006/types'
def __init__(self, filename):
self.filename = filename
def startDocument(self):
self._buffer = None
self._file = None
def startElementNS(self, name, *args):
if name == (self.types_ns, 'Content'):
# we can expect base64 data next
self._file = open(self.filename, 'wb')
self._buffer = []
def endElementNS(self, name, *args):
if name == (self.types_ns, 'Content'):
# all attachment data received, close the file
try:
if self._buffer:
raise ValueError("Incomplete Base64 data")
finally:
self._file.close()
self._file = self._buffer = None
def characters(self, data):
if self._buffer is None:
return
self._buffer.append(data)
self._decode_buffer()
def _decode_buffer(self):
remainder = ''
for data in self._buffer:
available = len(remainder) + len(data)
overflow = available % 4
if remainder:
data = (remainder + data)
remainder = ''
if overflow:
remainder, data = data[-overflow:], data[:-overflow]
if data:
self._file.write(b64decode(data))
self._buffer = [remainder] if remainder else []
你会像这样使用它:
import requests
from xml.sax import make_parser, handler
parser = make_parser()
parser.setFeature(handler.feature_namespaces, True)
parser.setContentHandler(AttachmentContentHandler('foo.txt'))
r = requests.post('https://example.com/EWS/Exchange.asmx', data=..., stream=True)
r.raw.decode_content = True # if content-encoding is used, decompress as we read
parser.parse(r.raw)
这将以最大 64KB 的块(默认 IncrementalParser
buffer size)解析输入 XML,因此附件数据最多解码为 48KB 的原始数据块。
我可能会扩展内容处理程序以获取目标目录,然后查找 <t:Name>
元素来提取文件名,然后使用它来将数据提取到找到的每个附件的正确文件名中。您还想验证您实际上是在处理 GetAttachmentResponse
文档,并处理错误响应。
我正在尝试使用 Exchange GetAttachment webservice using requests, lxml and base64io。此服务 returns SOAP XML HTTP 响应中的 base64 编码文件。文件内容包含在单个 XML 元素的一行中。 GetAttachment
只是一个例子,但问题更普遍。
我想将解码后的文件内容直接流式传输到磁盘,而不会将附件的全部内容随时存储在内存中,因为一个附件可能有几百 MB。
我试过这样的事情:
r = requests.post('https://example.com/EWS/Exchange.asmx', data=..., stream=True)
with open('foo.txt', 'wb') as f:
for action, elem in lxml.etree.iterparse(GzipFile(fileobj=r.raw)):
if elem.tag == 't:Content':
b64_encoder = Base64IO(BytesIO(elem.text))
f.write(b64_encoder.read())
但 lxml
仍将附件副本存储为 elem.text
。有什么方法可以创建一个完全流式传输的 XML 解析器,它还可以直接从输入流中流式传输元素的内容吗?
在这种情况下不要使用 iterparse
。 iterparse()
方法只能发出元素开始和结束事件,因此当找到结束 XML 标记时,元素中的任何 text 都会提供给您。
相反,使用 SAX parser interface. This is a general standard for XML parsing libraries, to pass on parsed data to a content handler. The ContentHandler.characters()
callback 以块的形式传递字符数据(假设实现 XML 库实际上利用了这种可能性)。这是 ElementTree API 的较低级别 API,并且 Python 标准库已经捆绑了 Expat 解析器来驱动它。
所以流程变成:
- 将传入的请求流包装在
GzipFile
中以便于解压缩。或者,更好的是,设置response.raw.decode_content = True
并根据服务器设置的内容编码将解压留给请求库。 - 将
GzipFile
实例或原始流传递给.parse()
method of a parser created withxml.sax.make_parser()
。然后解析器继续以块的形式从流中读取。通过使用make_parser()
,您首先可以启用命名空间处理等功能(如果 Exchange 决定更改用于每个命名空间的短前缀,这将确保您的代码不会中断)。 - 使用 XML 数据块调用内容处理程序
characters()
方法;检查正确的元素启动事件,以便您知道何时需要 base64 数据。您可以一次解码 chunks of (a multiple of) 4 characters 中的 base64 数据,并将其写入文件。我不会在这里使用base64io
,只是做你自己的分块。
一个简单的内容处理器可以是:
from xml.sax import handler
from base64 import b64decode
class AttachmentContentHandler(handler.ContentHandler):
types_ns = 'http://schemas.microsoft.com/exchange/services/2006/types'
def __init__(self, filename):
self.filename = filename
def startDocument(self):
self._buffer = None
self._file = None
def startElementNS(self, name, *args):
if name == (self.types_ns, 'Content'):
# we can expect base64 data next
self._file = open(self.filename, 'wb')
self._buffer = []
def endElementNS(self, name, *args):
if name == (self.types_ns, 'Content'):
# all attachment data received, close the file
try:
if self._buffer:
raise ValueError("Incomplete Base64 data")
finally:
self._file.close()
self._file = self._buffer = None
def characters(self, data):
if self._buffer is None:
return
self._buffer.append(data)
self._decode_buffer()
def _decode_buffer(self):
remainder = ''
for data in self._buffer:
available = len(remainder) + len(data)
overflow = available % 4
if remainder:
data = (remainder + data)
remainder = ''
if overflow:
remainder, data = data[-overflow:], data[:-overflow]
if data:
self._file.write(b64decode(data))
self._buffer = [remainder] if remainder else []
你会像这样使用它:
import requests
from xml.sax import make_parser, handler
parser = make_parser()
parser.setFeature(handler.feature_namespaces, True)
parser.setContentHandler(AttachmentContentHandler('foo.txt'))
r = requests.post('https://example.com/EWS/Exchange.asmx', data=..., stream=True)
r.raw.decode_content = True # if content-encoding is used, decompress as we read
parser.parse(r.raw)
这将以最大 64KB 的块(默认 IncrementalParser
buffer size)解析输入 XML,因此附件数据最多解码为 48KB 的原始数据块。
我可能会扩展内容处理程序以获取目标目录,然后查找 <t:Name>
元素来提取文件名,然后使用它来将数据提取到找到的每个附件的正确文件名中。您还想验证您实际上是在处理 GetAttachmentResponse
文档,并处理错误响应。