如何设计 boost::asio 套接字或其包装器的正确释放

How to design proper release of a boost::asio socket or wrapper thereof

我正在尝试使用 boost::asio 制作我自己的简单异步 TCP 服务器,因为我已经好几年没有碰过它了。

我能找到的最新示例清单是: http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/tutorial/tutdaytime3/src.html

这个示例列表的问题是(我觉得)它作弊而且作弊很大,通过使 tcp_connection 成为 shared_ptr,这样它就不用担心每个连接的生命周期管理。 (我认为)他们这样做是为了简洁,因为这是一个小教程,但该解决方案不是真实世界。

如果您想在计时器或类似的东西上向每个客户端发送消息怎么办? collection 客户端连接在任何现实世界中都是必需的 non-trivial 服务器。

我很担心每个连接的生命周期管理。我认为自然而然的做法是在 tcp_server 中保留 tcp_connection object 中的一些 collection 或指向它们的指针。从 OnConnect 回调中添加 collection 并从 collection OnDisconnect.

中删除

请注意,OnDisconnect 最有可能从实际的 Disconnect 方法中调用,如果出现错误,该方法又会从 OnReceive 回调或 OnSend 回调中调用。

嗯,问题就出在这里。

考虑一下我们有一个看起来像这样的调用堆栈:

tcp_connection::~tcp_connection
tcp_server::OnDisconnect
tcp_connection::OnDisconnect
tcp_connection::Disconnect
tcp_connection::OnReceive

这会导致错误,因为调用堆栈展开并且我们正在 object 中执行代码,它的析构函数被调用...我想,对吧?

我想每个做服务器编程的人都以某种方式遇到过这种情况。处理策略是什么?

我希望解释足够好。如果不让我知道,我会创建自己的源列表,但它会非常大。


编辑: 相关

) Memory management in asynchronous C++ code

IMO 不是一个可以接受的答案,依赖于 shared_ptr 出色的接听电话作弊,仅此而已,这不是真实世界。如果服务器想每 5 分钟对所有客户端说 "Hi" 怎么办?某种 collection 是必要的。如果您在多个线程上调用 io_service.run 怎么办?

我也在 boost 邮件列表上询问: http://boost.2283326.n4.nabble.com/How-to-design-proper-release-of-a-boost-asio-socket-or-wrapper-thereof-td4693442.html

asio 解决 "deletion problem" 存在未完成的异步方法的方法是将每个启用异步的对象拆分为 3 个 classes,例如:

  • 服务器
  • server_service
  • server_impl

每个 io_loop 有一项服务(参见 use_service<>)。该服务为服务器创建一个 impl,现在是一个句柄 class.

这样就把句柄的生命周期和实现的生命周期分开了。

现在,在句柄的析构函数中,可以(通过服务)向 impl 发送消息以取消所有未完成的 IO。

如果需要,句柄的析构函数可以自由等待这些 io 调用排队(例如,如果服务器的工作被委托给后台 io 循环或线程池)。

以这种方式实现所有 io_service-enabled 对象已成为我的习惯,因为它使使用 aiso 进行编码变得非常简单。

就像我说的,我看不出如何使用智能指针 "cheating, and cheating big"。我也不认为你的评估 "they do this for brevity" 站得住脚。


这是我们代码库中经过轻微编辑的摘录¹,它举例说明了如何使用 shared_ptrs 不排除跟踪连接。

它仅显示服务器端的内容,

  • connection.hpp中的一个非常简单的connection对象;这使用 enable_shared_from_this

  • 只是固定大小 connection_pool(我们也有动态调整池的大小,因此有锁定原语)。请注意我们如何对所有活动连接执行操作。

    所以你会简单地写这样的东西来写给所有的客户,就像在计时器上一样:

    _pool.for_each_active([] (auto const& conn) {
        send_message(conn, hello_world_packet);
    });
    
  • 一个示例 listener 显示它如何与 connection_pool 联系(它有一个关闭所有连接的示例方法)

代码清单

  • connection.hpp

    #pragma once
    
    #include "xxx/net/rpc/protocol.hpp"
    #include "log.hpp"
    #include "stats_filer.hpp"
    #include <memory>
    
    namespace xxx { namespace net { namespace rpc {
    
        struct connection : std::enable_shared_from_this<connection>, protected LogSource {
            typedef std::shared_ptr<connection> ptr;
    
          private:
            friend struct io;
            friend struct listener;
    
            boost::asio::io_service& _svc;
            protocol::socket _socket;
            protocol::endpoint _ep;
            protocol::endpoint _peer;
          public:
    
            connection(boost::asio::io_service& svc, protocol::endpoint ep)
                : LogSource("rpc::connection"),
                  _svc(svc),
                  _socket(svc),
                  _ep(ep)
            {}
    
            void init() {
                _socket.set_option(protocol::no_delay(true));
                _peer = _socket.remote_endpoint();
                g_stats_filer_p->inc_value("asio." + _ep.address().to_string() + ".sockets_accepted");
                debug() << "New connection from " << _peer;
            }
    
            protocol::endpoint endpoint() const { return _ep;     } 
            protocol::endpoint peer() const     { return _peer;   } 
            protocol::socket&  socket()         { return _socket; } 
    
            // TODO encapsulation
            int handle() {
                return _socket.native_handle();
            }
    
            bool valid() const { return _socket.is_open(); }
    
            void cancel() {
                _svc.post([this] { _socket.cancel(); }); 
            }
    
            using shutdown_type = boost::asio::ip::tcp::socket::shutdown_type;
            void shutdown(shutdown_type what = shutdown_type::shutdown_both) {
                _svc.post([=] { _socket.shutdown(what); }); 
            }
    
            ~connection() {
                g_stats_filer_p->inc_value("asio." + _ep.address().to_string() + ".sockets_disconnected");
            }
        };
    
    } } }
    
  • connection_pool.hpp

    #pragma once
    
    #include <mutex>
    #include "xxx/threads/null_mutex.hpp"
    #include "xxx/net/rpc/connection.hpp"
    #include "stats_filer.hpp"
    #include "log.hpp"
    
    namespace xxx { namespace net { namespace rpc {
    
        // not thread-safe by default, but pass e.g. std::mutex for `Mutex` if you need it
        template <typename Ptr = xxx::net::rpc::connection::ptr, typename Mutex = xxx::threads::null_mutex>
        struct basic_connection_pool : LogSource {
            using WeakPtr = std::weak_ptr<typename Ptr::element_type>;
    
            basic_connection_pool(std::string name = "connection_pool", size_t size)
                : LogSource(std::move(name)), _pool(size) 
            { }
    
            bool try_insert(Ptr const& conn) {
                std::lock_guard<Mutex> lk(_mx);
    
                auto slot = std::find_if(_pool.begin(), _pool.end(), std::mem_fn(&WeakPtr::expired));
    
                if (slot == _pool.end()) {
                    g_stats_filer_p->inc_value("asio." + conn->endpoint().address().to_string() + ".connections_dropped");
                    error() << "dropping connection from " << conn->peer() << ": connection pool (" << _pool.size() << ") saturated";
                    return false;
                }
    
                *slot = conn;
                return true;
            }
    
            template <typename F>
            void for_each_active(F action) {
                auto locked = [=] {
                    using namespace std;
                    lock_guard<Mutex> lk(_mx);
                    vector<Ptr> locked(_pool.size());
                    transform(_pool.begin(), _pool.end(), locked.begin(), mem_fn(&WeakPtr::lock));
                    return locked;
                }();
    
                for (auto const& p : locked)
                    if (p) action(p);
            }
    
            constexpr static bool synchronizing() {
                return not std::is_same<xxx::threads::null_mutex, Mutex>();
            }
    
          private:
            void dump_stats(LogSource::LogTx tx) const {
                // lock is assumed!
                size_t empty = 0, busy = 0, idle = 0;
    
                for (auto& p : _pool) {
                    switch (p.use_count()) {
                        case 0:  empty++; break;
                        case 1:  idle++;  break;
                        default: busy++;  break;
                    }
                }
    
                tx << "usage empty:" << empty << " busy:" << busy << " idle:" << idle;
            }
    
            Mutex _mx;
            std::vector<WeakPtr> _pool;
        };
    
        // TODO FIXME use null_mutex once growing is no longer required AND if
        // en-pooling still only happens from the single IO thread (XXX-2535)
        using server_connection_pool = basic_connection_pool<xxx::net::rpc::connection::ptr, std::mutex>;
    
    } } }
    
  • listener.hpp

    #pragma once
    
    #include "xxx/threads/null_mutex.hpp"
    #include <mutex>
    #include "xxx/net/rpc/connection_pool.hpp"
    #include "xxx/net/rpc/io_operations.hpp"
    
    namespace xxx { namespace net { namespace rpc {
    
        struct listener : std::enable_shared_from_this<listener>, LogSource {
            typedef std::shared_ptr<listener> ptr;
    
            protocol::acceptor _acceptor;
            protocol::endpoint _ep;
    
            listener(boost::asio::io_service& svc, protocol::endpoint ep, server_connection_pool& pool) 
                : LogSource("rpc::listener"), _acceptor(svc), _ep(ep), _pool(pool)
            {
                _acceptor.open(ep.protocol());
    
                _acceptor.set_option(protocol::acceptor::reuse_address(true));
                _acceptor.set_option(protocol::no_delay(true));
                ::fcntl(_acceptor.native(), F_SETFD, FD_CLOEXEC); // FIXME use non-racy socket factory?
                _acceptor.bind(ep);
    
                _acceptor.listen(32);
            }
    
            void accept_loop(std::function<void(connection::ptr conn)> on_accept) {
    
                auto self = shared_from_this();
                auto conn = std::make_shared<xxx::net::rpc::connection>(_acceptor.get_io_service(), _ep);
    
                _acceptor.async_accept(conn->_socket, [this,self,conn,on_accept](boost::system::error_code ec) {
                    if (ec) {
                        auto tx = ec == boost::asio::error::operation_aborted? debug() : warn();
                        tx << "failed accept " << ec.message();
                    } else {
                        ::fcntl(conn->_socket.native(), F_SETFD, FD_CLOEXEC); // FIXME use non-racy socket factory?
    
                        if (_pool.try_insert(conn)) {
                            on_accept(conn);
                        }
    
                        self->accept_loop(on_accept);
                    }
                });
            }
    
            void close() {
                _acceptor.cancel();
                _acceptor.close();
    
                _acceptor.get_io_service().post([=] {
                    _pool.for_each_active([] (auto const& sp) {
                        sp->shutdown(connection::shutdown_type::shutdown_both);
                        sp->cancel();
                    });
                });
    
                debug() << "shutdown";
            }
    
            ~listener() {
            }
    
          private:
            server_connection_pool& _pool;
        };
    
    } } }
    

¹ 下载为要点 https://gist.github.com/sehe/979af25b8ac4fd77e73cdf1da37ab4c2

连接生命周期是 boost::asio 的一个基本问题。根据经验,我可以向您保证,错误会导致 "undefined behaviour"...

asio 示例使用 shared_ptr 来确保连接保持活动状态,同时它可能在 asio::io_service 中有未完成的处理程序。请注意,即使在单个线程中,asio::io_service 也会与应用程序代码异步运行,请参阅 CppCon 2016: Michael Caisse "Asynchronous IO with Boost.Asio" 以获得对精确机制的出色描述。

A shared_ptr 使连接的生命周期由 shared_ptr 实例计数控制。恕我直言,这不是 "cheating and cheating big";而是解决复杂问题的优雅方法。

但是,我同意你的看法,即仅使用 shared_ptr 来控制连接生命周期并不是一个完整的解决方案,因为它会导致资源泄漏。

在我的回答中:Boost async_* functions and shared_ptr's, I proposed using a combination of shared_ptr and weak_ptr to manage connection lifetimes. An HTTP server using a combination of shared_ptr's and weak_ptr's can be found here: via-httplib

HTTP 服务器建立在一个异步 TCP 服务器之上,该服务器使用一组(shared_ptr 到)连接,根据您的建议在连接时创建并在断开连接时销毁。

虽然其他人的回答与此答案的后半部分类似,但我能找到的最完整的答案似乎来自在 Boost 邮件列表中提出的相同问题。

http://boost.2283326.n4.nabble.com/How-to-design-proper-release-of-a-boost-asio-socket-or-wrapper-thereof-td4693442.html

这里总结一下,方便以后搜索到这里的人。

有2个选项

1) 关闭套接字以取消任何未完成的 io,然后 post 回调 io_service 上的 post-断开连接逻辑并让服务器 class 当套接字断开连接时回调。然后它可以安全地释放连接。只要只有一个线程调用了io_service::run,那么其他异步操作在回调时就已经解决了。但是,如果有多个线程调用了io_service::run,那么这是不安全的。

2) 正如其他人在他们的回答中指出的那样,使用 shared_ptr 来管理连接的生命周期,使用出色的 io 操作来保持它们的存活是可行的。如果需要,我们可以使用连接集合 weak_ptr 来访问它们。后者是其他 post 中关于让我感到困惑的主题的小道消息。