Protobuf 导致 ParseFromIstream 上的分段错误

Protobuf causes segmentation fault on ParseFromIstream

我正在尝试扩展我的编程知识,我正在尝试进行一些多进程编程。

我想执行以下操作:在同一台主机上,多个可执行文件是 运行。其中一个可执行文件负责扫描文件系统,其中一个可执行文件负责处理数据等

但是,某些数据必须脱离主机传输。为了限制网络防火墙设置之类的东西,我希望有一个守护进程(多线程)通过 IPC 接收数据,然后再使用尚未确定的套接字实现将其发送到外部主机。

经过大量搜索和研究,最明显的使用模式是消费者/生产者模式,具有多进程生产者(守护进程产生消息)和多线程消费者(接收数据,最好通过共享内存,并将其发送到外部主机)。

我希望我的应用程序能够 运行 尽可能地跨平台。为此,我正在使用 boost::interprocess:message_queue。因为这个 Boost 库只接受二进制序列化对象,所以我使用 Google Protobuf 来处理序列化和反序列化。

我创建了 2 个可执行文件,当前称为 "consumer" 和 "producer"。生产者通过消息队列将消息发送给消费者,消费者反序列化它。下面的代码在传递简单的 "int" 对象时有效(在我看来,这意味着消息队列通信正常),但在使用来自 SerializeToOstream().

的数据时无效

您可能已经注意到,我是 IPC 和多进程编程的新手,但我相信我已经完成了功课。

这是我的 producer.cpp:

#include <iostream>
#include <chrono>
#include <thread>
#include <fstream>

#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>

#include <boost/thread.hpp>

#include <internal/messages/testmessage.pb.h>

int main(int argc, char** argv) {
    // Construct the object to be passed
    GOOGLE_PROTOBUF_VERIFY_VERSION;

    struct protoremove {
        ~protoremove(){ google::protobuf::ShutdownProtobufLibrary(); }
    } remover;

    ib::protobuf::testMessage myMessage;
    myMessage.set_id(10);
    myMessage.set_version(1);
    std::cout << myMessage.DebugString() << std::endl;

    // Initialize the Boost message queue
    try{
        //Open a message queue.
        boost::interprocess::message_queue mq
                (boost::interprocess::open_or_create
                        ,"message_queue"           //name
                        ,100                       //max message number
                        ,1000               //max message size
                );

        // Send our message
        std::ofstream buftosend;
        myMessage.SerializeToOstream(&buftosend);
        mq.send(&buftosend, sizeof(buftosend), 1);

    }
    catch(boost::interprocess::interprocess_exception &ex){
        std::cout << ex.what() << std::endl;
        return 1;
    }

    return 0;
}

consumer.cpp:

#include <iostream>
#include <fstream>

#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>

#include <boost/thread.hpp>

#include <internal/messages/testmessage.pb.h>

int main(int argc, char** argv) {
    // Open the message queue
    try {
        //Erase previous message queue
        boost::interprocess::message_queue::remove("message_queue");
        ib::protobuf::testMessage recvdMessage;

        //Create a message_queue.
        boost::interprocess::message_queue mq
                (boost::interprocess::open_or_create
                        ,"message_queue"           //name
                        ,100                       //max message number
                        ,1000               //max message size
                );

        unsigned int priority;
        boost::interprocess::message_queue::size_type recvd_size;

        std::ifstream incomingbuf;
        mq.receive(&incomingbuf, 1000, recvd_size, priority);

        recvdMessage.ParseFromIstream(&incomingbuf);

        recvdMessage.id();
        recvdMessage.DebugString();
    }
    catch(boost::interprocess::interprocess_exception &ex){
        boost::interprocess::message_queue::remove("message_queue");
        std::cout << "IP error " << ex.what() << std::endl;
        return 1;
    }
    boost::interprocess::message_queue::remove("message_queue");
    return 0;

}

以及消息定义(.proto):

package ib.protobuf;

message testMessage {
    required int32 version = 1;
    optional int64 id = 2;
    optional string data = 3;
    optional int64 sequencenumber = 4;
}

当 运行ning 消费者时,它等待数据(mq.receive() 调用被阻塞)。 当生产者启动时,消费者得到一个 SIGSEGV。 gdb 在其回溯中指出,这发生在第 44 行,即 ParseFromIstream() 方法。 生产者在 DebugString() 中输出正确的值。

(gdb) r
Starting program: /home/roel/bin/consumer 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib/libthread_db.so.1".

Program received signal SIGSEGV, Segmentation fault.
std::istream::sentry::sentry (this=0x7fffffffe117, __in=..., __noskip=true)
    at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:50
50  /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc: No such file or directory.
(gdb) bt
#0  std::istream::sentry::sentry (this=0x7fffffffe117, __in=..., __noskip=true)
    at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:50
#1  0x00007ffff679f7ab in std::istream::read (this=0x7fffffffe380, 
    __s=0x637d20 "", __n=8192)
    at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:653
#2  0x00007ffff6b10030 in google::protobuf::io::IstreamInputStream::CopyingIstreamInputStream::Read(void*, int) () from /usr/lib/libprotobuf.so.9
#3  0x00007ffff6a99fe1 in google::protobuf::io::CopyingInputStreamAdaptor::Next(void const**, int*) () from /usr/lib/libprotobuf.so.9
#4  0x00007ffff6a97950 in google::protobuf::io::CodedInputStream::Refresh() ()
   from /usr/lib/libprotobuf.so.9
#5  0x00007ffff6a94da3 in google::protobuf::MessageLite::ParseFromZeroCopyStream(google::protobuf::io::ZeroCopyInputStream*) () from /usr/lib/libprotobuf.so.9
#6  0x00007ffff6af5ad9 in google::protobuf::Message::ParseFromIstream(std::istream*) () from /usr/lib/libprotobuf.so.9
#7  0x0000000000407e35 in main (argc=1, argv=0x7fffffffe6d8)
    at /home/roel/source/consumer.cpp:44
(gdb) 

这是使用 CMake 和 GCC 6.0.1 在 Linux 上编译的。 我对我的程序有很多问题:


Q1. 首先也是最重要的; 什么可能导致分段错误?
我究竟做错了什么?我已经查看这段代码好几个小时了,但看不出问题所在。


Q2.中boost::interprocess::message_queue 构造函数,我必须定义 2 个参数;最大数量 消息,以及大小。对于标准类型,此尺寸为 固定的。但是,对于消息(一般而言),消息的大小 是可变的。那么,确定金额的最佳方法是什么 为消息保留多少内存? 我应该简单地设置一个最大值 每条消息的大小并创建一些多部分消息参数?


Q3. 有没有更好的方法来实现我的目标? 序列化数据, 将它放入队列中似乎是如此..复杂,尤其是 看到这可能是一个非常普遍的问题。必须有更多 人们试图创建跨平台的 IPC。像 ZeroMQ 这样的库 只支持 UNIX 域套接字。使用 TCP 套接字进行环回 界面看起来很难看。难道没有一个图书馆可以让我 将任意对象(大小和布局)作为消息放在共享中 内存段,消费者可以 pop()?我的意思是,在一个 单线程,这可以通过堆栈上的 push()pop() 来解决。 执行所有这些额外的步骤似乎开销很大。

提前感谢您的回复。


编辑

正如 The Dark 所说,上面的代码使用了 std::string 的实例而不是实际的字符串 (std::string.data())

producer.cpp回答如下:

std::string str = myMessage.SerializeAsString();
mq.send(str.data(), str.size(), 1); 

但是,这对 consumer.cpp 不起作用,因为字符串的大小已初始化为 0。

这是我用于 consumer.cpp:

的代码
unsigned int priority;
boost::interprocess::message_queue::size_type recvd_size;

//Reserve 1000 bytes of memory for our message
char incomingBuffer[1000];
mq.receive(&incomingBuffer, 1000, recvd_size, priority);

ib::protobuf::testMessage recvdMessage;

//Only if string object is really required
std::basic_string<char> str = incomingBuffer;
std::cout << "Message: " << str.data() << ". Size is " << recvd_size << std::endl;

//ParseFromString() can also directly parse "incomingBuffer", avoiding the cast above
recvdMessage.ParseFromString(str.data());

std::cout << "Message ID " << recvdMessage.id() << std::endl;
std::cout << recvdMessage.DebugString();

这部分制作人好像说错了

    // Send our message
    std::ofstream buftosend;
    myMessage.SerializeToOstream(&buftosend);
    mq.send(&buftosend, sizeof(buftosend), 1);

ofstream 尚未打开,因此没有可存储任何内容的文件,因此第一次调用将失败(不是崩溃)。 send 调用正在跨行发送原始 ofstream class 结构。这将不是可传输的格式。

我想你想要的是序列化为一个 ostringstream,然后传输 ostringstream 的内容(而不是整个对象)。

类似于:

    // Send our message
    std::ostringstream buftosend;
    myMessage.SerializeToOstream(&buftosend);
    std::string str = buftosend.str();
    mq.send(str.data(), str.size(), 1); 

或者更好:

    // Send our message
    std::string str = myMessage.SerializeAsString();
    mq.send(str.data(), str.size(), 1); 

您还可以添加调试行以显示 str 的内容,但请注意它是二进制的,因此不易读。

您的消费者可能有类似的问题(ifstream 需要在文件上打开)。

A3:

ZeroMQ

ZeroMQ而言,有许多不同的 传输-class可以同时使用。因此,如果希望使用本地线程间信号的最低开销,我们可以使用 inproc:// transport-class,如果是本地进程间,可以 .bind() / .connect() 使用 ipc:// transport-class。对于跨平台分布式处理,tcp://pgm://epgm:// transport-classes 使选择与系统和网络功能保持一致的通信需求变得容易。

( Do not hesitate to check other posts, with also a direct URL to Pieter HINTJENS' book, a must-read for going into distributed systems design )


nanomsg

另一个智能轻量级无代理消息/信令框架nanomsg 来自ZeroMQ 的共同父亲Martin SUSTRIK。同样,INPROCIPCTCP , 传输 classes 已准备就绪。 Definitely worth a few minutes to read his insightfull remarks on this subject.


然而,由于合理的假设消息传递代理不知道什么系统站在 messaging/signalling 套接字的远程端,因此使用这些框架依赖于某种对象表示——可以称为序列化、容器化对象、对象包装器,只需负责 "prepare" 将自己希望的对象转换为可传输的方式,以便分发和远程重新安装。

如所要求的那样,使用通用的共享内存设计进入其他架构,而 ZeroMQ inproc://nanomsg INPROC transport-classes 是 Zero-Copy almost-Zero-Latency 这个一般想法的例子。