使用 Boost 发送序列化结构但出现输入流错误
sending serialized structure with Boost but getting input stream error
这是 的后续问题。
我正在解决 2016 Marathon of Parallel Programming 的第一个挑战。本题与第一个挑战:字符串解析相关。
如果你看了here的问题集,显然第5步是可并行的,也就是第3页的[Try all derivations recursively.]
所以我想通过使用 ZeroMQ 和 Boost 在下面实现这个来并行化解决方案(以序列化结构 res_package
,其中包含工作线程的作业和主线程的结果):
main
中的PUSH套接字组是用来分发任务的。 main
中的 PULL
套接字组从工作线程收集结果。最后 PUB
套接字组用于向 worker 发送 kill 信号。
main
和 worker
之间有一个同步步骤,否则 main
中的 PUSH
套接字会向第一个连接的工作线程发送大量作业。我引用我的 post:
So what main thread does is: pushing #worker_num of sync msgs with its PUSH endpoint to worker threads each time and then reads confirmation msg from its PULL endpoint. If main thread retrieves #worker_num of confirmation msgs, then sync done. Format of the sync msg from worker is: the worker thread's ID in a string. So thread 0 would pass a 0 in string back to main thread.
如果主线程收到一个有意义的结果,即第二个字段为true
的Eval
(意味着该字符串被语法接受),主线程将发布kill信号。所有工作线程发回确认后,即struct res_package
中的字段bool exit_confirmed
,主线程加入工作线程并打印最终结果。
问题是,我从 Boost 得到了 运行时间错误。我不知道发生了什么:
# ./spec < ./spec.in
main() : creating thread, 0
thread 0 receives: sync
to_string 0
thread 0 sends: 0, with size: 1
thread 0 sync done
pass 0 to if_sync_done
main thread receives sync msg from thread 0
sync done in main thread
456Dynamic exception type: boost::archive::archive_exception
std::exception::what: input stream error
此异常是由第 912 行(来自 GDB 回溯)引起的。所以我猜问题是关于悬挂指针,或者在第 909 行以某种方式接收到的 t运行。但是我不知道如何走得更远。
要构建我的项目,您需要一些文件:spec.cc, spec.hh, Makefile
您可以从 here 下载原始项目。有spec.in
和judge.in
用于测试。
要在 Ubuntu、运行 上安装依赖项:
apt-get install -y libzmqpp3 libzmqpp-dev libzmq5 libzmq5-dbg libboost-all-dev build-essential g++
一个重要的数据结构是:
struct res_package
{
int i;
Set <Stack> ls; // from worker thread to main thread
Stack s; // from main thread to worker thread, job to process
bool if_accepted;
bool exit_confirmed;
Eval res;
bool set_nonempty;
// striped.....
template <typename Archive>
void serialize(Archive& ar, const unsigned int version)
{
ar & i;
ar & if_accepted;
ar & exit_confirmed;
ar & s;
ar & ls;
ar & res;
ar & set_nonempty;
}
}
工作线程接收 Stack s
并查看字符串是否可以被规则 s
接受。如果是,它将 if_accepted
设置为真,然后发回 Eval res
。如果没有,并且找到了多个可能的规则,它会发回一组所有可能的规则 ls
,并将 bool set_nonempty
设置为 true 以指示主要分发这些规则。
如果您感到困惑,请post发表评论。
您没有在 post 中包含序列化代码,但我在 spec.cc 中找到了它:
{
std::ostringstream obuffer;
boost::archive::text_oarchive oarchive(obuffer);
tmp.exit_confirmed = true;
oarchive & tmp;
std::string req_str(obuffer.str());
zmq::message_t res_msg(req_str.size());
memcpy((void *)res_msg.data(), req_str.data(), req_str.size());
sendres_socket.send(res_msg);
}
此处您在存档关闭前发送流。尝试明确关闭它,或限制生命周期:
{
std::ostringstream obuffer;
{
boost::archive::text_oarchive oarchive(obuffer);
tmp.exit_confirmed = true;
oarchive & tmp;
}
std::string req_str(obuffer.str());
zmq::message_t res_msg(req_str.size());
memcpy((void *)res_msg.data(), req_str.data(), req_str.size());
sendres_socket.send(res_msg);
}
这是
我正在解决 2016 Marathon of Parallel Programming 的第一个挑战。本题与第一个挑战:字符串解析相关。
如果你看了here的问题集,显然第5步是可并行的,也就是第3页的[Try all derivations recursively.]
所以我想通过使用 ZeroMQ 和 Boost 在下面实现这个来并行化解决方案(以序列化结构 res_package
,其中包含工作线程的作业和主线程的结果):
main
中的PUSH套接字组是用来分发任务的。 main
中的 PULL
套接字组从工作线程收集结果。最后 PUB
套接字组用于向 worker 发送 kill 信号。
main
和 worker
之间有一个同步步骤,否则 main
中的 PUSH
套接字会向第一个连接的工作线程发送大量作业。我引用我的 post:
So what main thread does is: pushing #worker_num of sync msgs with its PUSH endpoint to worker threads each time and then reads confirmation msg from its PULL endpoint. If main thread retrieves #worker_num of confirmation msgs, then sync done. Format of the sync msg from worker is: the worker thread's ID in a string. So thread 0 would pass a 0 in string back to main thread.
如果主线程收到一个有意义的结果,即第二个字段为true
的Eval
(意味着该字符串被语法接受),主线程将发布kill信号。所有工作线程发回确认后,即struct res_package
中的字段bool exit_confirmed
,主线程加入工作线程并打印最终结果。
问题是,我从 Boost 得到了 运行时间错误。我不知道发生了什么:
# ./spec < ./spec.in
main() : creating thread, 0
thread 0 receives: sync
to_string 0
thread 0 sends: 0, with size: 1
thread 0 sync done
pass 0 to if_sync_done
main thread receives sync msg from thread 0
sync done in main thread
456Dynamic exception type: boost::archive::archive_exception
std::exception::what: input stream error
此异常是由第 912 行(来自 GDB 回溯)引起的。所以我猜问题是关于悬挂指针,或者在第 909 行以某种方式接收到的 t运行。但是我不知道如何走得更远。
要构建我的项目,您需要一些文件:spec.cc, spec.hh, Makefile
您可以从 here 下载原始项目。有spec.in
和judge.in
用于测试。
要在 Ubuntu、运行 上安装依赖项:
apt-get install -y libzmqpp3 libzmqpp-dev libzmq5 libzmq5-dbg libboost-all-dev build-essential g++
一个重要的数据结构是:
struct res_package
{
int i;
Set <Stack> ls; // from worker thread to main thread
Stack s; // from main thread to worker thread, job to process
bool if_accepted;
bool exit_confirmed;
Eval res;
bool set_nonempty;
// striped.....
template <typename Archive>
void serialize(Archive& ar, const unsigned int version)
{
ar & i;
ar & if_accepted;
ar & exit_confirmed;
ar & s;
ar & ls;
ar & res;
ar & set_nonempty;
}
}
工作线程接收 Stack s
并查看字符串是否可以被规则 s
接受。如果是,它将 if_accepted
设置为真,然后发回 Eval res
。如果没有,并且找到了多个可能的规则,它会发回一组所有可能的规则 ls
,并将 bool set_nonempty
设置为 true 以指示主要分发这些规则。
如果您感到困惑,请post发表评论。
您没有在 post 中包含序列化代码,但我在 spec.cc 中找到了它:
{
std::ostringstream obuffer;
boost::archive::text_oarchive oarchive(obuffer);
tmp.exit_confirmed = true;
oarchive & tmp;
std::string req_str(obuffer.str());
zmq::message_t res_msg(req_str.size());
memcpy((void *)res_msg.data(), req_str.data(), req_str.size());
sendres_socket.send(res_msg);
}
此处您在存档关闭前发送流。尝试明确关闭它,或限制生命周期:
{
std::ostringstream obuffer;
{
boost::archive::text_oarchive oarchive(obuffer);
tmp.exit_confirmed = true;
oarchive & tmp;
}
std::string req_str(obuffer.str());
zmq::message_t res_msg(req_str.size());
memcpy((void *)res_msg.data(), req_str.data(), req_str.size());
sendres_socket.send(res_msg);
}