在不将整个文件加载到内存的情况下反序列化消息?

Deserializing messages without loading entire file into memory?

我正在使用 Google Protocol Buffers 和 Python 来解码一些大数据文件 -- 每个文件 200MB。我在下面有一些代码显示了如何解码分隔流并且它工作得很好。但是,它使用 read() 命令将整个文件加载到内存中,然后对其进行迭代。

import feed_pb2 as sfeed
import sys
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint32

with open('/home/working/data/feed.pb', 'rb') as f:
    buf = f.read() ## PROBLEM-LOADS ENTIRE FILE TO MEMORY.
    n = 0
    while n < len(buf):
        msg_len, new_pos = _DecodeVarint32(buf, n)
        n = new_pos
        msg_buf = buf[n:n+msg_len]
        n += msg_len
        read_row = sfeed.standard_feed()
        read_row.ParseFromString(msg_buf)
        # do something with read_metric
        print(read_row)

请注意,此代码来自另一个 SO post,但我不记得确切的 url。我想知道是否有一个 readlines() 与协议缓冲区等效,允许我一次读入一条分隔消息并对其进行解码?我基本上想要一个不受 RAM 限制的管道,我必须加载文件。

似乎有一个 pystream-protobuf 包支持其中的一些功能,但它已经一两年没有更新了。还有一个7年前的post问过类似的问题。不过我想知道从那以后有没有什么新的信息。

python example for reading multiple protobuf messages from a stream

如果可以一次加载一条完整的消息,则通过修改您发布的代码实现起来非常简单:

import feed_pb2 as sfeed
import sys
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint32

with open('/home/working/data/feed.pb', 'rb') as f:
    buf = f.read(10) # Maximum length of length prefix
    while buf:
        msg_len, new_pos = _DecodeVarint32(buf, 0)
        buf = buf[new_pos:]
        # read rest of the message
        buf += f.read(msg_len - len(buf))
        read_row = sfeed.standard_feed()
        read_row.ParseFromString(buf)
        buf = buf[msg_len:]
        # do something with read_metric
        print(read_row)
        # read length prefix for next message
        buf += f.read(10 - len(buf))

这会读取 10 个字节,足以解析长度前缀,然后在知道长度后读取消息的其余部分。

字符串突变在 Python 中效率不高(它们会复制大量数据),因此如果您的单个消息也很大,使用 bytearray 可以提高性能。

https://github.com/cartoonist/pystream-protobuf/ 于 6 个月前更新。到目前为止我还没有对它进行太多测试,但它似乎不需要更新就可以正常工作。它提供可选的 gzip 和异步。