Protobuf 导致 ParseFromIstream 上的分段错误
Protobuf causes segmentation fault on ParseFromIstream
我正在尝试扩展我的编程知识,我正在尝试进行一些多进程编程。
我想执行以下操作:在同一台主机上,多个可执行文件是 运行。其中一个可执行文件负责扫描文件系统,其中一个可执行文件负责处理数据等
但是,某些数据必须脱离主机传输。为了限制网络防火墙设置之类的东西,我希望有一个守护进程(多线程)通过 IPC 接收数据,然后再使用尚未确定的套接字实现将其发送到外部主机。
经过大量搜索和研究,最明显的使用模式是消费者/生产者模式,具有多进程生产者(守护进程产生消息)和多线程消费者(接收数据,最好通过共享内存,并将其发送到外部主机)。
我希望我的应用程序能够 运行 尽可能地跨平台。为此,我正在使用 boost::interprocess:message_queue。因为这个 Boost 库只接受二进制序列化对象,所以我使用 Google Protobuf 来处理序列化和反序列化。
我创建了 2 个可执行文件,当前称为 "consumer" 和 "producer"。生产者通过消息队列将消息发送给消费者,消费者反序列化它。下面的代码在传递简单的 "int" 对象时有效(在我看来,这意味着消息队列通信正常),但在使用来自 SerializeToOstream().
的数据时无效
您可能已经注意到,我是 IPC 和多进程编程的新手,但我相信我已经完成了功课。
这是我的 producer.cpp:
#include <iostream>
#include <chrono>
#include <thread>
#include <fstream>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/thread.hpp>
#include <internal/messages/testmessage.pb.h>
int main(int argc, char** argv) {
// Construct the object to be passed
GOOGLE_PROTOBUF_VERIFY_VERSION;
struct protoremove {
~protoremove(){ google::protobuf::ShutdownProtobufLibrary(); }
} remover;
ib::protobuf::testMessage myMessage;
myMessage.set_id(10);
myMessage.set_version(1);
std::cout << myMessage.DebugString() << std::endl;
// Initialize the Boost message queue
try{
//Open a message queue.
boost::interprocess::message_queue mq
(boost::interprocess::open_or_create
,"message_queue" //name
,100 //max message number
,1000 //max message size
);
// Send our message
std::ofstream buftosend;
myMessage.SerializeToOstream(&buftosend);
mq.send(&buftosend, sizeof(buftosend), 1);
}
catch(boost::interprocess::interprocess_exception &ex){
std::cout << ex.what() << std::endl;
return 1;
}
return 0;
}
consumer.cpp:
#include <iostream>
#include <fstream>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/thread.hpp>
#include <internal/messages/testmessage.pb.h>
int main(int argc, char** argv) {
// Open the message queue
try {
//Erase previous message queue
boost::interprocess::message_queue::remove("message_queue");
ib::protobuf::testMessage recvdMessage;
//Create a message_queue.
boost::interprocess::message_queue mq
(boost::interprocess::open_or_create
,"message_queue" //name
,100 //max message number
,1000 //max message size
);
unsigned int priority;
boost::interprocess::message_queue::size_type recvd_size;
std::ifstream incomingbuf;
mq.receive(&incomingbuf, 1000, recvd_size, priority);
recvdMessage.ParseFromIstream(&incomingbuf);
recvdMessage.id();
recvdMessage.DebugString();
}
catch(boost::interprocess::interprocess_exception &ex){
boost::interprocess::message_queue::remove("message_queue");
std::cout << "IP error " << ex.what() << std::endl;
return 1;
}
boost::interprocess::message_queue::remove("message_queue");
return 0;
}
以及消息定义(.proto):
package ib.protobuf;
message testMessage {
required int32 version = 1;
optional int64 id = 2;
optional string data = 3;
optional int64 sequencenumber = 4;
}
当 运行ning 消费者时,它等待数据(mq.receive() 调用被阻塞)。
当生产者启动时,消费者得到一个 SIGSEGV。 gdb 在其回溯中指出,这发生在第 44 行,即 ParseFromIstream() 方法。
生产者在 DebugString() 中输出正确的值。
(gdb) r
Starting program: /home/roel/bin/consumer
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib/libthread_db.so.1".
Program received signal SIGSEGV, Segmentation fault.
std::istream::sentry::sentry (this=0x7fffffffe117, __in=..., __noskip=true)
at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:50
50 /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc: No such file or directory.
(gdb) bt
#0 std::istream::sentry::sentry (this=0x7fffffffe117, __in=..., __noskip=true)
at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:50
#1 0x00007ffff679f7ab in std::istream::read (this=0x7fffffffe380,
__s=0x637d20 "", __n=8192)
at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:653
#2 0x00007ffff6b10030 in google::protobuf::io::IstreamInputStream::CopyingIstreamInputStream::Read(void*, int) () from /usr/lib/libprotobuf.so.9
#3 0x00007ffff6a99fe1 in google::protobuf::io::CopyingInputStreamAdaptor::Next(void const**, int*) () from /usr/lib/libprotobuf.so.9
#4 0x00007ffff6a97950 in google::protobuf::io::CodedInputStream::Refresh() ()
from /usr/lib/libprotobuf.so.9
#5 0x00007ffff6a94da3 in google::protobuf::MessageLite::ParseFromZeroCopyStream(google::protobuf::io::ZeroCopyInputStream*) () from /usr/lib/libprotobuf.so.9
#6 0x00007ffff6af5ad9 in google::protobuf::Message::ParseFromIstream(std::istream*) () from /usr/lib/libprotobuf.so.9
#7 0x0000000000407e35 in main (argc=1, argv=0x7fffffffe6d8)
at /home/roel/source/consumer.cpp:44
(gdb)
这是使用 CMake 和 GCC 6.0.1 在 Linux 上编译的。
我对我的程序有很多问题:
Q1.
首先也是最重要的; 什么可能导致分段错误?
我究竟做错了什么?我已经查看这段代码好几个小时了,但看不出问题所在。
Q2.
中boost::interprocess::message_queue
构造函数,我必须定义 2 个参数;最大数量
消息,以及大小。对于标准类型,此尺寸为
固定的。但是,对于消息(一般而言),消息的大小
是可变的。那么,确定金额的最佳方法是什么
为消息保留多少内存? 我应该简单地设置一个最大值
每条消息的大小并创建一些多部分消息参数?
Q3.
有没有更好的方法来实现我的目标? 序列化数据,
将它放入队列中似乎是如此..复杂,尤其是
看到这可能是一个非常普遍的问题。必须有更多
人们试图创建跨平台的 IPC。像 ZeroMQ 这样的库
只支持 UNIX 域套接字。使用 TCP 套接字进行环回
界面看起来很难看。难道没有一个图书馆可以让我
将任意对象(大小和布局)作为消息放在共享中
内存段,消费者可以 pop()
?我的意思是,在一个
单线程,这可以通过堆栈上的 push()
和 pop()
来解决。
执行所有这些额外的步骤似乎开销很大。
提前感谢您的回复。
编辑
正如 The Dark 所说,上面的代码使用了 std::string
的实例而不是实际的字符串 (std::string.data())
producer.cpp回答如下:
std::string str = myMessage.SerializeAsString();
mq.send(str.data(), str.size(), 1);
但是,这对 consumer.cpp 不起作用,因为字符串的大小已初始化为 0。
这是我用于 consumer.cpp:
的代码
unsigned int priority;
boost::interprocess::message_queue::size_type recvd_size;
//Reserve 1000 bytes of memory for our message
char incomingBuffer[1000];
mq.receive(&incomingBuffer, 1000, recvd_size, priority);
ib::protobuf::testMessage recvdMessage;
//Only if string object is really required
std::basic_string<char> str = incomingBuffer;
std::cout << "Message: " << str.data() << ". Size is " << recvd_size << std::endl;
//ParseFromString() can also directly parse "incomingBuffer", avoiding the cast above
recvdMessage.ParseFromString(str.data());
std::cout << "Message ID " << recvdMessage.id() << std::endl;
std::cout << recvdMessage.DebugString();
这部分制作人好像说错了
// Send our message
std::ofstream buftosend;
myMessage.SerializeToOstream(&buftosend);
mq.send(&buftosend, sizeof(buftosend), 1);
ofstream
尚未打开,因此没有可存储任何内容的文件,因此第一次调用将失败(不是崩溃)。 send 调用正在跨行发送原始 ofstream
class 结构。这将不是可传输的格式。
我想你想要的是序列化为一个 ostringstream,然后传输 ostringstream 的内容(而不是整个对象)。
类似于:
// Send our message
std::ostringstream buftosend;
myMessage.SerializeToOstream(&buftosend);
std::string str = buftosend.str();
mq.send(str.data(), str.size(), 1);
或者更好:
// Send our message
std::string str = myMessage.SerializeAsString();
mq.send(str.data(), str.size(), 1);
您还可以添加调试行以显示 str
的内容,但请注意它是二进制的,因此不易读。
您的消费者可能有类似的问题(ifstream 需要在文件上打开)。
A3:
ZeroMQ
就ZeroMQ
而言,有许多不同的 传输-class可以同时使用。因此,如果希望使用本地线程间信号的最低开销,我们可以使用 inproc://
transport-class,如果是本地进程间,可以 .bind()
/ .connect()
使用 ipc://
transport-class。对于跨平台分布式处理,tcp://
或 pgm://
或 epgm://
transport-classes 使选择与系统和网络功能保持一致的通信需求变得容易。
nanomsg
另一个智能轻量级无代理消息/信令框架nanomsg
来自ZeroMQ
的共同父亲Martin SUSTRIK。同样,INPROC
、IPC
和 TCP
, 传输 classes 已准备就绪。 Definitely worth a few minutes to read his insightfull remarks on this subject.
然而,由于合理的假设消息传递代理不知道什么系统站在 messaging/signalling 套接字的远程端,因此使用这些框架依赖于某种对象表示——可以称为序列化、容器化对象、对象包装器,只需负责 "prepare" 将自己希望的对象转换为可传输的方式,以便分发和远程重新安装。
如所要求的那样,使用通用的共享内存设计进入其他架构,而 ZeroMQ
inproc://
和 nanomsg
INPROC
transport-classes 是 Zero-Copy almost-Zero-Latency 这个一般想法的例子。
我正在尝试扩展我的编程知识,我正在尝试进行一些多进程编程。
我想执行以下操作:在同一台主机上,多个可执行文件是 运行。其中一个可执行文件负责扫描文件系统,其中一个可执行文件负责处理数据等
但是,某些数据必须脱离主机传输。为了限制网络防火墙设置之类的东西,我希望有一个守护进程(多线程)通过 IPC 接收数据,然后再使用尚未确定的套接字实现将其发送到外部主机。
经过大量搜索和研究,最明显的使用模式是消费者/生产者模式,具有多进程生产者(守护进程产生消息)和多线程消费者(接收数据,最好通过共享内存,并将其发送到外部主机)。
我希望我的应用程序能够 运行 尽可能地跨平台。为此,我正在使用 boost::interprocess:message_queue。因为这个 Boost 库只接受二进制序列化对象,所以我使用 Google Protobuf 来处理序列化和反序列化。
我创建了 2 个可执行文件,当前称为 "consumer" 和 "producer"。生产者通过消息队列将消息发送给消费者,消费者反序列化它。下面的代码在传递简单的 "int" 对象时有效(在我看来,这意味着消息队列通信正常),但在使用来自 SerializeToOstream().
的数据时无效您可能已经注意到,我是 IPC 和多进程编程的新手,但我相信我已经完成了功课。
这是我的 producer.cpp:
#include <iostream>
#include <chrono>
#include <thread>
#include <fstream>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/thread.hpp>
#include <internal/messages/testmessage.pb.h>
int main(int argc, char** argv) {
// Construct the object to be passed
GOOGLE_PROTOBUF_VERIFY_VERSION;
struct protoremove {
~protoremove(){ google::protobuf::ShutdownProtobufLibrary(); }
} remover;
ib::protobuf::testMessage myMessage;
myMessage.set_id(10);
myMessage.set_version(1);
std::cout << myMessage.DebugString() << std::endl;
// Initialize the Boost message queue
try{
//Open a message queue.
boost::interprocess::message_queue mq
(boost::interprocess::open_or_create
,"message_queue" //name
,100 //max message number
,1000 //max message size
);
// Send our message
std::ofstream buftosend;
myMessage.SerializeToOstream(&buftosend);
mq.send(&buftosend, sizeof(buftosend), 1);
}
catch(boost::interprocess::interprocess_exception &ex){
std::cout << ex.what() << std::endl;
return 1;
}
return 0;
}
consumer.cpp:
#include <iostream>
#include <fstream>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/thread.hpp>
#include <internal/messages/testmessage.pb.h>
int main(int argc, char** argv) {
// Open the message queue
try {
//Erase previous message queue
boost::interprocess::message_queue::remove("message_queue");
ib::protobuf::testMessage recvdMessage;
//Create a message_queue.
boost::interprocess::message_queue mq
(boost::interprocess::open_or_create
,"message_queue" //name
,100 //max message number
,1000 //max message size
);
unsigned int priority;
boost::interprocess::message_queue::size_type recvd_size;
std::ifstream incomingbuf;
mq.receive(&incomingbuf, 1000, recvd_size, priority);
recvdMessage.ParseFromIstream(&incomingbuf);
recvdMessage.id();
recvdMessage.DebugString();
}
catch(boost::interprocess::interprocess_exception &ex){
boost::interprocess::message_queue::remove("message_queue");
std::cout << "IP error " << ex.what() << std::endl;
return 1;
}
boost::interprocess::message_queue::remove("message_queue");
return 0;
}
以及消息定义(.proto):
package ib.protobuf;
message testMessage {
required int32 version = 1;
optional int64 id = 2;
optional string data = 3;
optional int64 sequencenumber = 4;
}
当 运行ning 消费者时,它等待数据(mq.receive() 调用被阻塞)。 当生产者启动时,消费者得到一个 SIGSEGV。 gdb 在其回溯中指出,这发生在第 44 行,即 ParseFromIstream() 方法。 生产者在 DebugString() 中输出正确的值。
(gdb) r
Starting program: /home/roel/bin/consumer
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib/libthread_db.so.1".
Program received signal SIGSEGV, Segmentation fault.
std::istream::sentry::sentry (this=0x7fffffffe117, __in=..., __noskip=true)
at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:50
50 /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc: No such file or directory.
(gdb) bt
#0 std::istream::sentry::sentry (this=0x7fffffffe117, __in=..., __noskip=true)
at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:50
#1 0x00007ffff679f7ab in std::istream::read (this=0x7fffffffe380,
__s=0x637d20 "", __n=8192)
at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:653
#2 0x00007ffff6b10030 in google::protobuf::io::IstreamInputStream::CopyingIstreamInputStream::Read(void*, int) () from /usr/lib/libprotobuf.so.9
#3 0x00007ffff6a99fe1 in google::protobuf::io::CopyingInputStreamAdaptor::Next(void const**, int*) () from /usr/lib/libprotobuf.so.9
#4 0x00007ffff6a97950 in google::protobuf::io::CodedInputStream::Refresh() ()
from /usr/lib/libprotobuf.so.9
#5 0x00007ffff6a94da3 in google::protobuf::MessageLite::ParseFromZeroCopyStream(google::protobuf::io::ZeroCopyInputStream*) () from /usr/lib/libprotobuf.so.9
#6 0x00007ffff6af5ad9 in google::protobuf::Message::ParseFromIstream(std::istream*) () from /usr/lib/libprotobuf.so.9
#7 0x0000000000407e35 in main (argc=1, argv=0x7fffffffe6d8)
at /home/roel/source/consumer.cpp:44
(gdb)
这是使用 CMake 和 GCC 6.0.1 在 Linux 上编译的。 我对我的程序有很多问题:
Q1.
首先也是最重要的; 什么可能导致分段错误?
我究竟做错了什么?我已经查看这段代码好几个小时了,但看不出问题所在。
Q2.
中boost::interprocess::message_queue
构造函数,我必须定义 2 个参数;最大数量
消息,以及大小。对于标准类型,此尺寸为
固定的。但是,对于消息(一般而言),消息的大小
是可变的。那么,确定金额的最佳方法是什么
为消息保留多少内存? 我应该简单地设置一个最大值
每条消息的大小并创建一些多部分消息参数?
Q3.
有没有更好的方法来实现我的目标? 序列化数据,
将它放入队列中似乎是如此..复杂,尤其是
看到这可能是一个非常普遍的问题。必须有更多
人们试图创建跨平台的 IPC。像 ZeroMQ 这样的库
只支持 UNIX 域套接字。使用 TCP 套接字进行环回
界面看起来很难看。难道没有一个图书馆可以让我
将任意对象(大小和布局)作为消息放在共享中
内存段,消费者可以 pop()
?我的意思是,在一个
单线程,这可以通过堆栈上的 push()
和 pop()
来解决。
执行所有这些额外的步骤似乎开销很大。
提前感谢您的回复。
编辑
正如 The Dark 所说,上面的代码使用了 std::string
的实例而不是实际的字符串 (std::string.data())
producer.cpp回答如下:
std::string str = myMessage.SerializeAsString();
mq.send(str.data(), str.size(), 1);
但是,这对 consumer.cpp 不起作用,因为字符串的大小已初始化为 0。
这是我用于 consumer.cpp:
的代码unsigned int priority;
boost::interprocess::message_queue::size_type recvd_size;
//Reserve 1000 bytes of memory for our message
char incomingBuffer[1000];
mq.receive(&incomingBuffer, 1000, recvd_size, priority);
ib::protobuf::testMessage recvdMessage;
//Only if string object is really required
std::basic_string<char> str = incomingBuffer;
std::cout << "Message: " << str.data() << ". Size is " << recvd_size << std::endl;
//ParseFromString() can also directly parse "incomingBuffer", avoiding the cast above
recvdMessage.ParseFromString(str.data());
std::cout << "Message ID " << recvdMessage.id() << std::endl;
std::cout << recvdMessage.DebugString();
这部分制作人好像说错了
// Send our message
std::ofstream buftosend;
myMessage.SerializeToOstream(&buftosend);
mq.send(&buftosend, sizeof(buftosend), 1);
ofstream
尚未打开,因此没有可存储任何内容的文件,因此第一次调用将失败(不是崩溃)。 send 调用正在跨行发送原始 ofstream
class 结构。这将不是可传输的格式。
我想你想要的是序列化为一个 ostringstream,然后传输 ostringstream 的内容(而不是整个对象)。
类似于:
// Send our message
std::ostringstream buftosend;
myMessage.SerializeToOstream(&buftosend);
std::string str = buftosend.str();
mq.send(str.data(), str.size(), 1);
或者更好:
// Send our message
std::string str = myMessage.SerializeAsString();
mq.send(str.data(), str.size(), 1);
您还可以添加调试行以显示 str
的内容,但请注意它是二进制的,因此不易读。
您的消费者可能有类似的问题(ifstream 需要在文件上打开)。
A3:
ZeroMQ
就ZeroMQ
而言,有许多不同的 传输-class可以同时使用。因此,如果希望使用本地线程间信号的最低开销,我们可以使用 inproc://
transport-class,如果是本地进程间,可以 .bind()
/ .connect()
使用 ipc://
transport-class。对于跨平台分布式处理,tcp://
或 pgm://
或 epgm://
transport-classes 使选择与系统和网络功能保持一致的通信需求变得容易。
nanomsg
另一个智能轻量级无代理消息/信令框架nanomsg
来自ZeroMQ
的共同父亲Martin SUSTRIK。同样,INPROC
、IPC
和 TCP
, 传输 classes 已准备就绪。 Definitely worth a few minutes to read his insightfull remarks on this subject.
然而,由于合理的假设消息传递代理不知道什么系统站在 messaging/signalling 套接字的远程端,因此使用这些框架依赖于某种对象表示——可以称为序列化、容器化对象、对象包装器,只需负责 "prepare" 将自己希望的对象转换为可传输的方式,以便分发和远程重新安装。
如所要求的那样,使用通用的共享内存设计进入其他架构,而 ZeroMQ
inproc://
和 nanomsg
INPROC
transport-classes 是 Zero-Copy almost-Zero-Latency 这个一般想法的例子。