如何设计 boost::asio 套接字或其包装器的正确释放
How to design proper release of a boost::asio socket or wrapper thereof
我正在尝试使用 boost::asio 制作我自己的简单异步 TCP 服务器,因为我已经好几年没有碰过它了。
我能找到的最新示例清单是:
http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/tutorial/tutdaytime3/src.html
这个示例列表的问题是(我觉得)它作弊而且作弊很大,通过使 tcp_connection 成为 shared_ptr,这样它就不用担心每个连接的生命周期管理。 (我认为)他们这样做是为了简洁,因为这是一个小教程,但该解决方案不是真实世界。
如果您想在计时器或类似的东西上向每个客户端发送消息怎么办? collection 客户端连接在任何现实世界中都是必需的 non-trivial 服务器。
我很担心每个连接的生命周期管理。我认为自然而然的做法是在 tcp_server 中保留 tcp_connection object 中的一些 collection 或指向它们的指针。从 OnConnect 回调中添加 collection 并从 collection OnDisconnect.
中删除
请注意,OnDisconnect 最有可能从实际的 Disconnect 方法中调用,如果出现错误,该方法又会从 OnReceive 回调或 OnSend 回调中调用。
嗯,问题就出在这里。
考虑一下我们有一个看起来像这样的调用堆栈:
tcp_connection::~tcp_connection
tcp_server::OnDisconnect
tcp_connection::OnDisconnect
tcp_connection::Disconnect
tcp_connection::OnReceive
这会导致错误,因为调用堆栈展开并且我们正在 object 中执行代码,它的析构函数被调用...我想,对吧?
我想每个做服务器编程的人都以某种方式遇到过这种情况。处理策略是什么?
我希望解释足够好。如果不让我知道,我会创建自己的源列表,但它会非常大。
编辑:
相关
) Memory management in asynchronous C++ code
IMO 不是一个可以接受的答案,依赖于 shared_ptr 出色的接听电话作弊,仅此而已,这不是真实世界。如果服务器想每 5 分钟对所有客户端说 "Hi" 怎么办?某种 collection 是必要的。如果您在多个线程上调用 io_service.run 怎么办?
我也在 boost 邮件列表上询问:
http://boost.2283326.n4.nabble.com/How-to-design-proper-release-of-a-boost-asio-socket-or-wrapper-thereof-td4693442.html
asio 解决 "deletion problem" 存在未完成的异步方法的方法是将每个启用异步的对象拆分为 3 个 classes,例如:
- 服务器
- server_service
- server_impl
每个 io_loop 有一项服务(参见 use_service<>
)。该服务为服务器创建一个 impl,现在是一个句柄 class.
这样就把句柄的生命周期和实现的生命周期分开了。
现在,在句柄的析构函数中,可以(通过服务)向 impl 发送消息以取消所有未完成的 IO。
如果需要,句柄的析构函数可以自由等待这些 io 调用排队(例如,如果服务器的工作被委托给后台 io 循环或线程池)。
以这种方式实现所有 io_service-enabled 对象已成为我的习惯,因为它使使用 aiso 进行编码变得非常简单。
就像我说的,我看不出如何使用智能指针 "cheating, and cheating big"。我也不认为你的评估 "they do this for brevity" 站得住脚。
这是我们代码库中经过轻微编辑的摘录¹,它举例说明了如何使用 shared_ptrs 不排除跟踪连接。
它仅显示服务器端的内容,
connection.hpp中的一个非常简单的connection
对象;这使用 enable_shared_from_this
只是固定大小 connection_pool
(我们也有动态调整池的大小,因此有锁定原语)。请注意我们如何对所有活动连接执行操作。
所以你会简单地写这样的东西来写给所有的客户,就像在计时器上一样:
_pool.for_each_active([] (auto const& conn) {
send_message(conn, hello_world_packet);
});
一个示例 listener
显示它如何与 connection_pool
联系(它有一个关闭所有连接的示例方法)
代码清单
connection.hpp
#pragma once
#include "xxx/net/rpc/protocol.hpp"
#include "log.hpp"
#include "stats_filer.hpp"
#include <memory>
namespace xxx { namespace net { namespace rpc {
struct connection : std::enable_shared_from_this<connection>, protected LogSource {
typedef std::shared_ptr<connection> ptr;
private:
friend struct io;
friend struct listener;
boost::asio::io_service& _svc;
protocol::socket _socket;
protocol::endpoint _ep;
protocol::endpoint _peer;
public:
connection(boost::asio::io_service& svc, protocol::endpoint ep)
: LogSource("rpc::connection"),
_svc(svc),
_socket(svc),
_ep(ep)
{}
void init() {
_socket.set_option(protocol::no_delay(true));
_peer = _socket.remote_endpoint();
g_stats_filer_p->inc_value("asio." + _ep.address().to_string() + ".sockets_accepted");
debug() << "New connection from " << _peer;
}
protocol::endpoint endpoint() const { return _ep; }
protocol::endpoint peer() const { return _peer; }
protocol::socket& socket() { return _socket; }
// TODO encapsulation
int handle() {
return _socket.native_handle();
}
bool valid() const { return _socket.is_open(); }
void cancel() {
_svc.post([this] { _socket.cancel(); });
}
using shutdown_type = boost::asio::ip::tcp::socket::shutdown_type;
void shutdown(shutdown_type what = shutdown_type::shutdown_both) {
_svc.post([=] { _socket.shutdown(what); });
}
~connection() {
g_stats_filer_p->inc_value("asio." + _ep.address().to_string() + ".sockets_disconnected");
}
};
} } }
connection_pool.hpp
#pragma once
#include <mutex>
#include "xxx/threads/null_mutex.hpp"
#include "xxx/net/rpc/connection.hpp"
#include "stats_filer.hpp"
#include "log.hpp"
namespace xxx { namespace net { namespace rpc {
// not thread-safe by default, but pass e.g. std::mutex for `Mutex` if you need it
template <typename Ptr = xxx::net::rpc::connection::ptr, typename Mutex = xxx::threads::null_mutex>
struct basic_connection_pool : LogSource {
using WeakPtr = std::weak_ptr<typename Ptr::element_type>;
basic_connection_pool(std::string name = "connection_pool", size_t size)
: LogSource(std::move(name)), _pool(size)
{ }
bool try_insert(Ptr const& conn) {
std::lock_guard<Mutex> lk(_mx);
auto slot = std::find_if(_pool.begin(), _pool.end(), std::mem_fn(&WeakPtr::expired));
if (slot == _pool.end()) {
g_stats_filer_p->inc_value("asio." + conn->endpoint().address().to_string() + ".connections_dropped");
error() << "dropping connection from " << conn->peer() << ": connection pool (" << _pool.size() << ") saturated";
return false;
}
*slot = conn;
return true;
}
template <typename F>
void for_each_active(F action) {
auto locked = [=] {
using namespace std;
lock_guard<Mutex> lk(_mx);
vector<Ptr> locked(_pool.size());
transform(_pool.begin(), _pool.end(), locked.begin(), mem_fn(&WeakPtr::lock));
return locked;
}();
for (auto const& p : locked)
if (p) action(p);
}
constexpr static bool synchronizing() {
return not std::is_same<xxx::threads::null_mutex, Mutex>();
}
private:
void dump_stats(LogSource::LogTx tx) const {
// lock is assumed!
size_t empty = 0, busy = 0, idle = 0;
for (auto& p : _pool) {
switch (p.use_count()) {
case 0: empty++; break;
case 1: idle++; break;
default: busy++; break;
}
}
tx << "usage empty:" << empty << " busy:" << busy << " idle:" << idle;
}
Mutex _mx;
std::vector<WeakPtr> _pool;
};
// TODO FIXME use null_mutex once growing is no longer required AND if
// en-pooling still only happens from the single IO thread (XXX-2535)
using server_connection_pool = basic_connection_pool<xxx::net::rpc::connection::ptr, std::mutex>;
} } }
listener.hpp
#pragma once
#include "xxx/threads/null_mutex.hpp"
#include <mutex>
#include "xxx/net/rpc/connection_pool.hpp"
#include "xxx/net/rpc/io_operations.hpp"
namespace xxx { namespace net { namespace rpc {
struct listener : std::enable_shared_from_this<listener>, LogSource {
typedef std::shared_ptr<listener> ptr;
protocol::acceptor _acceptor;
protocol::endpoint _ep;
listener(boost::asio::io_service& svc, protocol::endpoint ep, server_connection_pool& pool)
: LogSource("rpc::listener"), _acceptor(svc), _ep(ep), _pool(pool)
{
_acceptor.open(ep.protocol());
_acceptor.set_option(protocol::acceptor::reuse_address(true));
_acceptor.set_option(protocol::no_delay(true));
::fcntl(_acceptor.native(), F_SETFD, FD_CLOEXEC); // FIXME use non-racy socket factory?
_acceptor.bind(ep);
_acceptor.listen(32);
}
void accept_loop(std::function<void(connection::ptr conn)> on_accept) {
auto self = shared_from_this();
auto conn = std::make_shared<xxx::net::rpc::connection>(_acceptor.get_io_service(), _ep);
_acceptor.async_accept(conn->_socket, [this,self,conn,on_accept](boost::system::error_code ec) {
if (ec) {
auto tx = ec == boost::asio::error::operation_aborted? debug() : warn();
tx << "failed accept " << ec.message();
} else {
::fcntl(conn->_socket.native(), F_SETFD, FD_CLOEXEC); // FIXME use non-racy socket factory?
if (_pool.try_insert(conn)) {
on_accept(conn);
}
self->accept_loop(on_accept);
}
});
}
void close() {
_acceptor.cancel();
_acceptor.close();
_acceptor.get_io_service().post([=] {
_pool.for_each_active([] (auto const& sp) {
sp->shutdown(connection::shutdown_type::shutdown_both);
sp->cancel();
});
});
debug() << "shutdown";
}
~listener() {
}
private:
server_connection_pool& _pool;
};
} } }
¹ 下载为要点 https://gist.github.com/sehe/979af25b8ac4fd77e73cdf1da37ab4c2
连接生命周期是 boost::asio
的一个基本问题。根据经验,我可以向您保证,错误会导致 "undefined behaviour"...
asio
示例使用 shared_ptr
来确保连接保持活动状态,同时它可能在 asio::io_service
中有未完成的处理程序。请注意,即使在单个线程中,asio::io_service
也会与应用程序代码异步运行,请参阅 CppCon 2016: Michael Caisse "Asynchronous IO with Boost.Asio" 以获得对精确机制的出色描述。
A shared_ptr
使连接的生命周期由 shared_ptr
实例计数控制。恕我直言,这不是 "cheating and cheating big";而是解决复杂问题的优雅方法。
但是,我同意你的看法,即仅使用 shared_ptr
来控制连接生命周期并不是一个完整的解决方案,因为它会导致资源泄漏。
在我的回答中:Boost async_* functions and shared_ptr's, I proposed using a combination of shared_ptr
and weak_ptr
to manage connection lifetimes. An HTTP server using a combination of shared_ptr
's and weak_ptr
's can be found here: via-httplib。
HTTP 服务器建立在一个异步 TCP 服务器之上,该服务器使用一组(shared_ptr
到)连接,根据您的建议在连接时创建并在断开连接时销毁。
虽然其他人的回答与此答案的后半部分类似,但我能找到的最完整的答案似乎来自在 Boost 邮件列表中提出的相同问题。
这里总结一下,方便以后搜索到这里的人。
有2个选项
1) 关闭套接字以取消任何未完成的 io,然后 post 回调 io_service 上的 post-断开连接逻辑并让服务器 class 当套接字断开连接时回调。然后它可以安全地释放连接。只要只有一个线程调用了io_service::run,那么其他异步操作在回调时就已经解决了。但是,如果有多个线程调用了io_service::run,那么这是不安全的。
2) 正如其他人在他们的回答中指出的那样,使用 shared_ptr 来管理连接的生命周期,使用出色的 io 操作来保持它们的存活是可行的。如果需要,我们可以使用连接集合 weak_ptr 来访问它们。后者是其他 post 中关于让我感到困惑的主题的小道消息。
我正在尝试使用 boost::asio 制作我自己的简单异步 TCP 服务器,因为我已经好几年没有碰过它了。
我能找到的最新示例清单是: http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/tutorial/tutdaytime3/src.html
这个示例列表的问题是(我觉得)它作弊而且作弊很大,通过使 tcp_connection 成为 shared_ptr,这样它就不用担心每个连接的生命周期管理。 (我认为)他们这样做是为了简洁,因为这是一个小教程,但该解决方案不是真实世界。
如果您想在计时器或类似的东西上向每个客户端发送消息怎么办? collection 客户端连接在任何现实世界中都是必需的 non-trivial 服务器。
我很担心每个连接的生命周期管理。我认为自然而然的做法是在 tcp_server 中保留 tcp_connection object 中的一些 collection 或指向它们的指针。从 OnConnect 回调中添加 collection 并从 collection OnDisconnect.
中删除请注意,OnDisconnect 最有可能从实际的 Disconnect 方法中调用,如果出现错误,该方法又会从 OnReceive 回调或 OnSend 回调中调用。
嗯,问题就出在这里。
考虑一下我们有一个看起来像这样的调用堆栈:
tcp_connection::~tcp_connection
tcp_server::OnDisconnect
tcp_connection::OnDisconnect
tcp_connection::Disconnect
tcp_connection::OnReceive
这会导致错误,因为调用堆栈展开并且我们正在 object 中执行代码,它的析构函数被调用...我想,对吧?
我想每个做服务器编程的人都以某种方式遇到过这种情况。处理策略是什么?
我希望解释足够好。如果不让我知道,我会创建自己的源列表,但它会非常大。
编辑: 相关
) Memory management in asynchronous C++ code
IMO 不是一个可以接受的答案,依赖于 shared_ptr 出色的接听电话作弊,仅此而已,这不是真实世界。如果服务器想每 5 分钟对所有客户端说 "Hi" 怎么办?某种 collection 是必要的。如果您在多个线程上调用 io_service.run 怎么办?
我也在 boost 邮件列表上询问: http://boost.2283326.n4.nabble.com/How-to-design-proper-release-of-a-boost-asio-socket-or-wrapper-thereof-td4693442.html
asio 解决 "deletion problem" 存在未完成的异步方法的方法是将每个启用异步的对象拆分为 3 个 classes,例如:
- 服务器
- server_service
- server_impl
每个 io_loop 有一项服务(参见 use_service<>
)。该服务为服务器创建一个 impl,现在是一个句柄 class.
这样就把句柄的生命周期和实现的生命周期分开了。
现在,在句柄的析构函数中,可以(通过服务)向 impl 发送消息以取消所有未完成的 IO。
如果需要,句柄的析构函数可以自由等待这些 io 调用排队(例如,如果服务器的工作被委托给后台 io 循环或线程池)。
以这种方式实现所有 io_service-enabled 对象已成为我的习惯,因为它使使用 aiso 进行编码变得非常简单。
就像我说的,我看不出如何使用智能指针 "cheating, and cheating big"。我也不认为你的评估 "they do this for brevity" 站得住脚。
这是我们代码库中经过轻微编辑的摘录¹,它举例说明了如何使用 shared_ptrs 不排除跟踪连接。
它仅显示服务器端的内容,
connection.hpp中的一个非常简单的
connection
对象;这使用enable_shared_from_this
只是固定大小
connection_pool
(我们也有动态调整池的大小,因此有锁定原语)。请注意我们如何对所有活动连接执行操作。所以你会简单地写这样的东西来写给所有的客户,就像在计时器上一样:
_pool.for_each_active([] (auto const& conn) { send_message(conn, hello_world_packet); });
一个示例
listener
显示它如何与connection_pool
联系(它有一个关闭所有连接的示例方法)
代码清单
connection.hpp
#pragma once #include "xxx/net/rpc/protocol.hpp" #include "log.hpp" #include "stats_filer.hpp" #include <memory> namespace xxx { namespace net { namespace rpc { struct connection : std::enable_shared_from_this<connection>, protected LogSource { typedef std::shared_ptr<connection> ptr; private: friend struct io; friend struct listener; boost::asio::io_service& _svc; protocol::socket _socket; protocol::endpoint _ep; protocol::endpoint _peer; public: connection(boost::asio::io_service& svc, protocol::endpoint ep) : LogSource("rpc::connection"), _svc(svc), _socket(svc), _ep(ep) {} void init() { _socket.set_option(protocol::no_delay(true)); _peer = _socket.remote_endpoint(); g_stats_filer_p->inc_value("asio." + _ep.address().to_string() + ".sockets_accepted"); debug() << "New connection from " << _peer; } protocol::endpoint endpoint() const { return _ep; } protocol::endpoint peer() const { return _peer; } protocol::socket& socket() { return _socket; } // TODO encapsulation int handle() { return _socket.native_handle(); } bool valid() const { return _socket.is_open(); } void cancel() { _svc.post([this] { _socket.cancel(); }); } using shutdown_type = boost::asio::ip::tcp::socket::shutdown_type; void shutdown(shutdown_type what = shutdown_type::shutdown_both) { _svc.post([=] { _socket.shutdown(what); }); } ~connection() { g_stats_filer_p->inc_value("asio." + _ep.address().to_string() + ".sockets_disconnected"); } }; } } }
connection_pool.hpp
#pragma once #include <mutex> #include "xxx/threads/null_mutex.hpp" #include "xxx/net/rpc/connection.hpp" #include "stats_filer.hpp" #include "log.hpp" namespace xxx { namespace net { namespace rpc { // not thread-safe by default, but pass e.g. std::mutex for `Mutex` if you need it template <typename Ptr = xxx::net::rpc::connection::ptr, typename Mutex = xxx::threads::null_mutex> struct basic_connection_pool : LogSource { using WeakPtr = std::weak_ptr<typename Ptr::element_type>; basic_connection_pool(std::string name = "connection_pool", size_t size) : LogSource(std::move(name)), _pool(size) { } bool try_insert(Ptr const& conn) { std::lock_guard<Mutex> lk(_mx); auto slot = std::find_if(_pool.begin(), _pool.end(), std::mem_fn(&WeakPtr::expired)); if (slot == _pool.end()) { g_stats_filer_p->inc_value("asio." + conn->endpoint().address().to_string() + ".connections_dropped"); error() << "dropping connection from " << conn->peer() << ": connection pool (" << _pool.size() << ") saturated"; return false; } *slot = conn; return true; } template <typename F> void for_each_active(F action) { auto locked = [=] { using namespace std; lock_guard<Mutex> lk(_mx); vector<Ptr> locked(_pool.size()); transform(_pool.begin(), _pool.end(), locked.begin(), mem_fn(&WeakPtr::lock)); return locked; }(); for (auto const& p : locked) if (p) action(p); } constexpr static bool synchronizing() { return not std::is_same<xxx::threads::null_mutex, Mutex>(); } private: void dump_stats(LogSource::LogTx tx) const { // lock is assumed! size_t empty = 0, busy = 0, idle = 0; for (auto& p : _pool) { switch (p.use_count()) { case 0: empty++; break; case 1: idle++; break; default: busy++; break; } } tx << "usage empty:" << empty << " busy:" << busy << " idle:" << idle; } Mutex _mx; std::vector<WeakPtr> _pool; }; // TODO FIXME use null_mutex once growing is no longer required AND if // en-pooling still only happens from the single IO thread (XXX-2535) using server_connection_pool = basic_connection_pool<xxx::net::rpc::connection::ptr, std::mutex>; } } }
listener.hpp
#pragma once #include "xxx/threads/null_mutex.hpp" #include <mutex> #include "xxx/net/rpc/connection_pool.hpp" #include "xxx/net/rpc/io_operations.hpp" namespace xxx { namespace net { namespace rpc { struct listener : std::enable_shared_from_this<listener>, LogSource { typedef std::shared_ptr<listener> ptr; protocol::acceptor _acceptor; protocol::endpoint _ep; listener(boost::asio::io_service& svc, protocol::endpoint ep, server_connection_pool& pool) : LogSource("rpc::listener"), _acceptor(svc), _ep(ep), _pool(pool) { _acceptor.open(ep.protocol()); _acceptor.set_option(protocol::acceptor::reuse_address(true)); _acceptor.set_option(protocol::no_delay(true)); ::fcntl(_acceptor.native(), F_SETFD, FD_CLOEXEC); // FIXME use non-racy socket factory? _acceptor.bind(ep); _acceptor.listen(32); } void accept_loop(std::function<void(connection::ptr conn)> on_accept) { auto self = shared_from_this(); auto conn = std::make_shared<xxx::net::rpc::connection>(_acceptor.get_io_service(), _ep); _acceptor.async_accept(conn->_socket, [this,self,conn,on_accept](boost::system::error_code ec) { if (ec) { auto tx = ec == boost::asio::error::operation_aborted? debug() : warn(); tx << "failed accept " << ec.message(); } else { ::fcntl(conn->_socket.native(), F_SETFD, FD_CLOEXEC); // FIXME use non-racy socket factory? if (_pool.try_insert(conn)) { on_accept(conn); } self->accept_loop(on_accept); } }); } void close() { _acceptor.cancel(); _acceptor.close(); _acceptor.get_io_service().post([=] { _pool.for_each_active([] (auto const& sp) { sp->shutdown(connection::shutdown_type::shutdown_both); sp->cancel(); }); }); debug() << "shutdown"; } ~listener() { } private: server_connection_pool& _pool; }; } } }
¹ 下载为要点 https://gist.github.com/sehe/979af25b8ac4fd77e73cdf1da37ab4c2
连接生命周期是 boost::asio
的一个基本问题。根据经验,我可以向您保证,错误会导致 "undefined behaviour"...
asio
示例使用 shared_ptr
来确保连接保持活动状态,同时它可能在 asio::io_service
中有未完成的处理程序。请注意,即使在单个线程中,asio::io_service
也会与应用程序代码异步运行,请参阅 CppCon 2016: Michael Caisse "Asynchronous IO with Boost.Asio" 以获得对精确机制的出色描述。
A shared_ptr
使连接的生命周期由 shared_ptr
实例计数控制。恕我直言,这不是 "cheating and cheating big";而是解决复杂问题的优雅方法。
但是,我同意你的看法,即仅使用 shared_ptr
来控制连接生命周期并不是一个完整的解决方案,因为它会导致资源泄漏。
在我的回答中:Boost async_* functions and shared_ptr's, I proposed using a combination of shared_ptr
and weak_ptr
to manage connection lifetimes. An HTTP server using a combination of shared_ptr
's and weak_ptr
's can be found here: via-httplib。
HTTP 服务器建立在一个异步 TCP 服务器之上,该服务器使用一组(shared_ptr
到)连接,根据您的建议在连接时创建并在断开连接时销毁。
虽然其他人的回答与此答案的后半部分类似,但我能找到的最完整的答案似乎来自在 Boost 邮件列表中提出的相同问题。
这里总结一下,方便以后搜索到这里的人。
有2个选项
1) 关闭套接字以取消任何未完成的 io,然后 post 回调 io_service 上的 post-断开连接逻辑并让服务器 class 当套接字断开连接时回调。然后它可以安全地释放连接。只要只有一个线程调用了io_service::run,那么其他异步操作在回调时就已经解决了。但是,如果有多个线程调用了io_service::run,那么这是不安全的。
2) 正如其他人在他们的回答中指出的那样,使用 shared_ptr 来管理连接的生命周期,使用出色的 io 操作来保持它们的存活是可行的。如果需要,我们可以使用连接集合 weak_ptr 来访问它们。后者是其他 post 中关于让我感到困惑的主题的小道消息。