是否可以使用 Varint32 大小前缀的 Protocol Buffer 消息实现类似 'FileInputStream::BackUp()' 的功能?
Is it possible to implement 'FileInputStream::BackUp()'-like functionality with Varint32 size-prefixed Protocol Buffer messages?
我正在尝试使用 readDelimitedFrom()
的 following implementation 在 C++ 中解析分隔的 protobuf 消息(来自文件)- 也复制如下:
bool readDelimitedFrom(
google::protobuf::io::ZeroCopyInputStream* rawInput,
google::protobuf::MessageLite* message) {
// We create a new coded stream for each message. Don't worry, this is fast,
// and it makes sure the 64MB total size limit is imposed per-message rather
// than on the whole stream. (See the CodedInputStream interface for more
// info on this limit.)
google::protobuf::io::CodedInputStream input(rawInput);
// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size)) return false;
// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
input.PushLimit(size);
// Parse the message.
if (!message->MergeFromCodedStream(&input)) return false;
if (!input.ConsumedEntireMessage()) return false;
// Release the limit.
input.PopLimit(limit);
return true;
}
我的问题是我需要根据消息中包含的 uint32_t
字段对消息进行分组并分批处理它们 - 让我们称之为 id
.
目前,我的主循环中有以下代码:
...
int infd = -1;
_sopen_s(&infd, argv[1], _O_RDONLY | _O_BINARY, _SH_DENYWR, _S_IREAD);
google::protobuf::io::ZeroCopyInputStream *input =
new google::protobuf::io::FileInputStream(infd);
std::vector<ProtoMessage> msgList;
bool readMore = true;
do {
ProtoMessage msg;
readMore = readNextMessage(input, msg, msgList);
if (!msgList.empty()) {
std::cout << "Processing Message Batch - ID: " << msgList[0].id();
/* some processing done here */
}
} while (readMore);
readNextMessage()
的实现如下:
bool readNextMessage(
google::protobuf::io::ZeroCopyInputStream* rawInput,
ProtoMessage& nextMsg,
std::vector<ProtoMessage>& batchList) {
bool sameBatch = false;
uint32_t msgID = 0;
do {
if (readDelimitedFrom(rawInput, &scan) == -1)
return false;
if (nextMsg.id() == 0)
msgID = nextMsg.id(); // guaranteed to be non-zero
if (sameBatch = (msgID == nextMsg.id()))
batchList.push_back(nextMsg);
} while (sameBatch);
// need a way to roll-back here as nextMsg is now the first new
// ProtoMessage belonging to a new batch.
return true;
}
这个函数的逻辑相当简单:取一个 ZeroCopyInputStream
并使用 readDelimitedFrom()
解析它,根据它们的 id
字段将 ProtoMessage
消息分组到一个向量中.如果遇到具有新 ID 的消息,则停止并将 return 控制权返回到 main
以处理消息批处理。
这导致了不希望的要求,即必须 consume/read 第一条消息(包括其 Varint32 编码的大小) 不 属于前一批而没有'backup' 流的一种方式。我希望能够将 ZeroCopyInputStream
指向最后一个 readDelimitedFrom()
.
之前的位置
有什么方法可以修改 readDelimitedFrom()
使其也 return 调用期间消耗的字节数,然后在 ZeroCopyInputStream
上使用指针算法来实现所需的功能?
提供的函数 ZeroCopyInputStream::Backup()
有一个先决条件,即 ZeroCopyInputStream::Next()
是最后一个方法调用。显然,使用 CodedInputStream
包装器解析定界消息时情况并非如此。
ZeroCopyInputStream::Backup()
只能备份收到的最后一个 缓冲区 。一条消息可能跨越多个缓冲区,因此在给定 ZeroCopyInputStream
接口的情况下,没有通用的方法来执行您想要的操作。
部分选项:
- 在解析每条消息之前调用
rawInput->ByteCount()
,以便准确确定消息开始的字节位置。如果您需要回滚,请向后查找基础文件并在其顶部重新创建 ZeroCopyInputStream
。当然,这只有在您从文件中读取时才有效。
- 当您在新的批次中遇到消息时,将其保存到一边,然后在来电者要求开始阅读下一批次时将其取回。
我正在尝试使用 readDelimitedFrom()
的 following implementation 在 C++ 中解析分隔的 protobuf 消息(来自文件)- 也复制如下:
bool readDelimitedFrom(
google::protobuf::io::ZeroCopyInputStream* rawInput,
google::protobuf::MessageLite* message) {
// We create a new coded stream for each message. Don't worry, this is fast,
// and it makes sure the 64MB total size limit is imposed per-message rather
// than on the whole stream. (See the CodedInputStream interface for more
// info on this limit.)
google::protobuf::io::CodedInputStream input(rawInput);
// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size)) return false;
// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
input.PushLimit(size);
// Parse the message.
if (!message->MergeFromCodedStream(&input)) return false;
if (!input.ConsumedEntireMessage()) return false;
// Release the limit.
input.PopLimit(limit);
return true;
}
我的问题是我需要根据消息中包含的 uint32_t
字段对消息进行分组并分批处理它们 - 让我们称之为 id
.
目前,我的主循环中有以下代码:
...
int infd = -1;
_sopen_s(&infd, argv[1], _O_RDONLY | _O_BINARY, _SH_DENYWR, _S_IREAD);
google::protobuf::io::ZeroCopyInputStream *input =
new google::protobuf::io::FileInputStream(infd);
std::vector<ProtoMessage> msgList;
bool readMore = true;
do {
ProtoMessage msg;
readMore = readNextMessage(input, msg, msgList);
if (!msgList.empty()) {
std::cout << "Processing Message Batch - ID: " << msgList[0].id();
/* some processing done here */
}
} while (readMore);
readNextMessage()
的实现如下:
bool readNextMessage(
google::protobuf::io::ZeroCopyInputStream* rawInput,
ProtoMessage& nextMsg,
std::vector<ProtoMessage>& batchList) {
bool sameBatch = false;
uint32_t msgID = 0;
do {
if (readDelimitedFrom(rawInput, &scan) == -1)
return false;
if (nextMsg.id() == 0)
msgID = nextMsg.id(); // guaranteed to be non-zero
if (sameBatch = (msgID == nextMsg.id()))
batchList.push_back(nextMsg);
} while (sameBatch);
// need a way to roll-back here as nextMsg is now the first new
// ProtoMessage belonging to a new batch.
return true;
}
这个函数的逻辑相当简单:取一个 ZeroCopyInputStream
并使用 readDelimitedFrom()
解析它,根据它们的 id
字段将 ProtoMessage
消息分组到一个向量中.如果遇到具有新 ID 的消息,则停止并将 return 控制权返回到 main
以处理消息批处理。
这导致了不希望的要求,即必须 consume/read 第一条消息(包括其 Varint32 编码的大小) 不 属于前一批而没有'backup' 流的一种方式。我希望能够将 ZeroCopyInputStream
指向最后一个 readDelimitedFrom()
.
有什么方法可以修改 readDelimitedFrom()
使其也 return 调用期间消耗的字节数,然后在 ZeroCopyInputStream
上使用指针算法来实现所需的功能?
提供的函数 ZeroCopyInputStream::Backup()
有一个先决条件,即 ZeroCopyInputStream::Next()
是最后一个方法调用。显然,使用 CodedInputStream
包装器解析定界消息时情况并非如此。
ZeroCopyInputStream::Backup()
只能备份收到的最后一个 缓冲区 。一条消息可能跨越多个缓冲区,因此在给定 ZeroCopyInputStream
接口的情况下,没有通用的方法来执行您想要的操作。
部分选项:
- 在解析每条消息之前调用
rawInput->ByteCount()
,以便准确确定消息开始的字节位置。如果您需要回滚,请向后查找基础文件并在其顶部重新创建ZeroCopyInputStream
。当然,这只有在您从文件中读取时才有效。 - 当您在新的批次中遇到消息时,将其保存到一边,然后在来电者要求开始阅读下一批次时将其取回。