在不将整个文件加载到内存的情况下反序列化消息?
Deserializing messages without loading entire file into memory?
我正在使用 Google Protocol Buffers 和 Python 来解码一些大数据文件 -- 每个文件 200MB。我在下面有一些代码显示了如何解码分隔流并且它工作得很好。但是,它使用 read()
命令将整个文件加载到内存中,然后对其进行迭代。
import feed_pb2 as sfeed
import sys
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint32
with open('/home/working/data/feed.pb', 'rb') as f:
buf = f.read() ## PROBLEM-LOADS ENTIRE FILE TO MEMORY.
n = 0
while n < len(buf):
msg_len, new_pos = _DecodeVarint32(buf, n)
n = new_pos
msg_buf = buf[n:n+msg_len]
n += msg_len
read_row = sfeed.standard_feed()
read_row.ParseFromString(msg_buf)
# do something with read_metric
print(read_row)
请注意,此代码来自另一个 SO post,但我不记得确切的 url。我想知道是否有一个 readlines()
与协议缓冲区等效,允许我一次读入一条分隔消息并对其进行解码?我基本上想要一个不受 RAM 限制的管道,我必须加载文件。
似乎有一个 pystream-protobuf
包支持其中的一些功能,但它已经一两年没有更新了。还有一个7年前的post问过类似的问题。不过我想知道从那以后有没有什么新的信息。
python example for reading multiple protobuf messages from a stream
如果可以一次加载一条完整的消息,则通过修改您发布的代码实现起来非常简单:
import feed_pb2 as sfeed
import sys
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint32
with open('/home/working/data/feed.pb', 'rb') as f:
buf = f.read(10) # Maximum length of length prefix
while buf:
msg_len, new_pos = _DecodeVarint32(buf, 0)
buf = buf[new_pos:]
# read rest of the message
buf += f.read(msg_len - len(buf))
read_row = sfeed.standard_feed()
read_row.ParseFromString(buf)
buf = buf[msg_len:]
# do something with read_metric
print(read_row)
# read length prefix for next message
buf += f.read(10 - len(buf))
这会读取 10 个字节,足以解析长度前缀,然后在知道长度后读取消息的其余部分。
字符串突变在 Python 中效率不高(它们会复制大量数据),因此如果您的单个消息也很大,使用 bytearray
可以提高性能。
https://github.com/cartoonist/pystream-protobuf/ 于 6 个月前更新。到目前为止我还没有对它进行太多测试,但它似乎不需要更新就可以正常工作。它提供可选的 gzip 和异步。
我正在使用 Google Protocol Buffers 和 Python 来解码一些大数据文件 -- 每个文件 200MB。我在下面有一些代码显示了如何解码分隔流并且它工作得很好。但是,它使用 read()
命令将整个文件加载到内存中,然后对其进行迭代。
import feed_pb2 as sfeed
import sys
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint32
with open('/home/working/data/feed.pb', 'rb') as f:
buf = f.read() ## PROBLEM-LOADS ENTIRE FILE TO MEMORY.
n = 0
while n < len(buf):
msg_len, new_pos = _DecodeVarint32(buf, n)
n = new_pos
msg_buf = buf[n:n+msg_len]
n += msg_len
read_row = sfeed.standard_feed()
read_row.ParseFromString(msg_buf)
# do something with read_metric
print(read_row)
请注意,此代码来自另一个 SO post,但我不记得确切的 url。我想知道是否有一个 readlines()
与协议缓冲区等效,允许我一次读入一条分隔消息并对其进行解码?我基本上想要一个不受 RAM 限制的管道,我必须加载文件。
似乎有一个 pystream-protobuf
包支持其中的一些功能,但它已经一两年没有更新了。还有一个7年前的post问过类似的问题。不过我想知道从那以后有没有什么新的信息。
python example for reading multiple protobuf messages from a stream
如果可以一次加载一条完整的消息,则通过修改您发布的代码实现起来非常简单:
import feed_pb2 as sfeed
import sys
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint32
with open('/home/working/data/feed.pb', 'rb') as f:
buf = f.read(10) # Maximum length of length prefix
while buf:
msg_len, new_pos = _DecodeVarint32(buf, 0)
buf = buf[new_pos:]
# read rest of the message
buf += f.read(msg_len - len(buf))
read_row = sfeed.standard_feed()
read_row.ParseFromString(buf)
buf = buf[msg_len:]
# do something with read_metric
print(read_row)
# read length prefix for next message
buf += f.read(10 - len(buf))
这会读取 10 个字节,足以解析长度前缀,然后在知道长度后读取消息的其余部分。
字符串突变在 Python 中效率不高(它们会复制大量数据),因此如果您的单个消息也很大,使用 bytearray
可以提高性能。
https://github.com/cartoonist/pystream-protobuf/ 于 6 个月前更新。到目前为止我还没有对它进行太多测试,但它似乎不需要更新就可以正常工作。它提供可选的 gzip 和异步。