0MQ:在将上下文传递给线程池时抛出分段错误

0MQ: on passing a context to a pool of threads a Segmentation fault was thrown

我正在构建一个小程序 (here),其中 main() 将相同的 msg 发送到所有工作线程,工作线程只打印 msg.

我想我完全按照 this 教程,来自官方指南。

我通过 void 指针将 0MQ context 传递给线程,然后将其转换回 zmq::context_t *。但我仍然遇到了分段错误。以下是来自 GDB 中 coredump 的信息:

Core was generated by `./test 1'.
Program terminated with signal SIGSEGV, Segmentation fault.
#0  zmq::ctx_t::check_tag (this=this@entry=0xabadcafe) at src/ctx.cpp:89
89  src/ctx.cpp: No such file or directory.
[Current thread is 1 (Thread 0x7fcdaae3f700 (LWP 1428))]
(gdb) bt
#0  zmq::ctx_t::check_tag (this=this@entry=0xabadcafe) at src/ctx.cpp:89
#1  0x00007fcdad17c275 in zmq_socket (ctx_=0xabadcafe, type_=7) at src/zmq.cpp:245
#2  0x0000000000402da1 in zmq::socket_t::init (this=0x7fcdaae3edf0, context_=..., type_=7) at /usr/include/zmq.hpp:649
#3  0x0000000000402ac3 in zmq::socket_t::socket_t (this=0x7fcdaae3edf0, context_=..., type_=7) at /usr/include/zmq.hpp:463
#4  0x0000000000401f18 in task1 (arg=0x170ac20) at test.cpp:21
#5  0x00000000004050fe in std::_Bind_simple<void (*(void*))(void*)>::_M_invoke<0ul>(std::_Index_tuple<0ul>) (this=0x170e0d8)
    at /usr/include/c++/5/functional:1531
#6  0x0000000000405008 in std::_Bind_simple<void (*(void*))(void*)>::operator()() (this=0x170e0d8) at /usr/include/c++/5/functional:1520
#7  0x0000000000404f98 in std::thread::_Impl<std::_Bind_simple<void (*(void*))(void*)> >::_M_run() (this=0x170e0c0) at /usr/include/c++/5/thread:115
#8  0x00007fcdacc48c80 in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#9  0x00007fcdacf196ba in start_thread (arg=0x7fcdaae3f700) at pthread_create.c:333
#10 0x00007fcdac6b782d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109
(gdb) quit

g++ -std=c++11 test.cpp -o test -lzmq -lpthread -g.

编译的程序
pool.push_back(thread(task1, (void *)context));

应该是

pool.push_back(thread(task1, (void *)&context));

因为你要传context

的地址

在此用例中,无线程 Context() 可以提供帮助:

Native API 推荐这种做法用于基于纯inproc:// transport-class 的多线程用法:

The zmq_init() function initialises a ØMQ context.

The io_threads argument specifies the size of the ØMQ thread pool to handle I/O operations. If your application is using only the inproc transport for messaging you may set this to zero, otherwise set it to at least one.

Thread safety

A ØMQ context is thread safe and may be shared among as many application threads as necessary, without any additional locking required on the part of the caller.

This function is deprecated by zmq_ctx_new(3).

最近的 { API / c++ wrapper } 将此步骤(最初在 { v2.1.11 API / c++ wrapper } 中通过 Context( NUM_io_threads ) 在实例化时提供)移动到{ v4.2.2 } 通过设置调用 post-instatiation(但预套接字关联)状态的拆分过程
zmq_ctx_set( *ctx, ZMQ_IO_THREADS, 0 )


int main( int argc, char* argv[] )
{   
    int worker_num;
    if (  argc != 2 ) { cout << "1 parameter pls" << endl;                           exit(1); } // EXIT[1]
    try
    {
      if ( !has_only_digits( string( argv[1] ) ) ) { cout << "digit pls" << endl;    exit(1); } // EXIT[2]
      worker_num =     stoi( string( argv[1] ) );
    }
    catch ( exception e ) { cout << "exception while processing parameters" << endl; exit(1); } // EXIT[3]
 // _______________________________________  // ZeroMQ inproc use-case W/O IO-thread
    zmq::context_t context( 0 );             // 
 // _______________________________________  // 

    zmq::socket_t  distask_socket( context, ZMQ_PUSH );
                   distask_socket.bind( "inproc://task_publisher" );

    vector<thread> pool;

    for ( int i = 0; i < worker_num; i++ )
    {
      cout << "main() : creating thread, " << i << endl;
      pool.push_back( thread(           task1,
                              (void *) &context
                              )
                      );
    }

    for ( int i = 0; i < worker_num; i++ )
    {
      zmq::message_t msg( 6 );
      memcpy( (void *) msg.data(), "World", 6 ); 
      distask_socket.send( msg );
    }

    for ( auto &t : pool )
      t.join();

    exit(0);
}

万一压榨最后一点性能并削减延迟源,这是值得的。