使用 Boost 发送序列化结构但出现输入流错误

sending serialized structure with Boost but getting input stream error

这是 的后续问题。

我正在解决 2016 Marathon of Parallel Programming 的第一个挑战。本题与第一个挑战:字符串解析相关。

如果你看了here的问题集,显然第5步是可并行的,也就是第3页的[Try all derivations recursively.]

所以我想通过使用 ZeroMQ 和 Boost 在下面实现这个来并行化解决方案(以序列化结构 res_package,其中包含工作线程的作业和主线程的结果):

main中的PUSH套接字组是用来分发任务的。 main 中的 PULL 套接字组从工作线程收集结果。最后 PUB 套接字组用于向 worker 发送 kill 信号。

mainworker 之间有一个同步步骤,否则 main 中的 PUSH 套接字会向第一个连接的工作线程发送大量作业。我引用我的 post:

So what main thread does is: pushing #worker_num of sync msgs with its PUSH endpoint to worker threads each time and then reads confirmation msg from its PULL endpoint. If main thread retrieves #worker_num of confirmation msgs, then sync done. Format of the sync msg from worker is: the worker thread's ID in a string. So thread 0 would pass a 0 in string back to main thread.

如果主线程收到一个有意义的结果,即第二个字段为trueEval(意味着该字符串被语法接受),主线程将发布kill信号。所有工作线程发回确认后,即struct res_package中的字段bool exit_confirmed,主线程加入工作线程并打印最终结果。

问题是,我从 Boost 得到了 运行时间错误。我不知道发生了什么:

# ./spec < ./spec.in
main() : creating thread, 0
thread 0 receives: sync
to_string 0
thread 0 sends: 0, with size: 1
thread 0 sync done
pass 0 to if_sync_done
main thread receives sync msg from thread 0
sync done in main thread
456Dynamic exception type: boost::archive::archive_exception
std::exception::what: input stream error

此异常是由第 912 行(来自 GDB 回溯)引起的。所以我猜问题是关于悬挂指针,或者在第 909 行以某种方式接收到的 t运行。但是我不知道如何走得更远。

要构建我的项目,您需要一些文件:spec.cc, spec.hh, Makefile

您可以从 here 下载原始项目。有spec.injudge.in用于测试。

要在 Ubuntu、运行 上安装依赖项:

apt-get install -y libzmqpp3 libzmqpp-dev libzmq5 libzmq5-dbg libboost-all-dev build-essential g++

一个重要的数据结构是:

struct res_package
{
  int i;
  Set <Stack> ls; // from worker thread to main thread
  Stack s; // from main thread to worker thread, job to process
  bool if_accepted;
  bool exit_confirmed;
  Eval res;
  bool set_nonempty;

// striped.....

  template <typename Archive>
  void serialize(Archive& ar, const unsigned int version)
  {
    ar & i;
    ar & if_accepted;
    ar & exit_confirmed;
    ar & s;
    ar & ls;
    ar & res;
    ar & set_nonempty;
  }
}

工作线程接收 Stack s 并查看字符串是否可以被规则 s 接受。如果是,它将 if_accepted 设置为真,然后发回 Eval res。如果没有,并且找到了多个可能的规则,它会发回一组所有可能的规则 ls,并将 bool set_nonempty 设置为 true 以指示主要分发这些规则。

如果您感到困惑,请post发表评论。

您没有在 post 中包含序列化代码,但我在 spec.cc 中找到了它:

{
    std::ostringstream obuffer;
    boost::archive::text_oarchive oarchive(obuffer);

    tmp.exit_confirmed = true;
    oarchive & tmp;
    std::string req_str(obuffer.str());
    zmq::message_t res_msg(req_str.size());
    memcpy((void *)res_msg.data(), req_str.data(), req_str.size());
    sendres_socket.send(res_msg);
}

此处您在存档关闭前发送流。尝试明确关闭它,或限制生命周期:

{
    std::ostringstream obuffer;
    {
        boost::archive::text_oarchive oarchive(obuffer);

        tmp.exit_confirmed = true;
        oarchive & tmp;
    }
    std::string req_str(obuffer.str());
    zmq::message_t res_msg(req_str.size());
    memcpy((void *)res_msg.data(), req_str.data(), req_str.size());
    sendres_socket.send(res_msg);
}