Linux 上的 Asio 在 epoll() 中停滞

Asio on Linux stalls in epoll()

我们在 Linux 上独立(非 Boost)Asio 1.10.6 的异步操作遇到问题,使用以下测试应用程序进行了演示:

#define ASIO_STANDALONE
#define ASIO_HEADER_ONLY
#define ASIO_NO_EXCEPTIONS
#define ASIO_NO_TYPEID
#include "asio.hpp"

#include <chrono>
#include <iostream>
#include <list>
#include <map>
#include <thread>

static bool s_freeInboundSocket = false;
static bool s_freeOutboundSocket = false;

class Tester
{
public:
    Tester(asio::io_service& i_ioService, unsigned i_n)
        : m_inboundStrand(i_ioService)
        , m_listener(i_ioService)
        , m_outboundStrand(i_ioService)
        , m_resolver(i_ioService)
        , m_n(i_n)
        , m_traceStart(std::chrono::high_resolution_clock::now())
    {}

    ~Tester()
    {}

    void TraceIn(unsigned i_line)
    {
        m_inboundTrace.emplace_back(i_line, std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - m_traceStart));
    }

    void AbortIn(unsigned i_line)
    {
        TraceIn(i_line);
        abort();
    }

    void TraceOut(unsigned i_line)
    {
        m_outboundTrace.emplace_back(i_line, std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - m_traceStart));
    }

    void AbortOut(unsigned i_line)
    {
        TraceOut(i_line);
        abort();
    }

    void DumpTrace(std::map<unsigned, unsigned>& o_counts)
    {
        std::cout << "## " << m_n << " ##\n";
        std::cout << "-- " << m_traceStart.time_since_epoch().count() << "\n";
        std::cout << "- in -             - out -\n";

        auto in = m_inboundTrace.begin();
        auto out = m_outboundTrace.begin();

        while ((in != m_inboundTrace.end()) || (out != m_outboundTrace.end()))
        {
            if (in == m_inboundTrace.end()) 
            {
                ++o_counts[out->first];

                std::cout << "                  " << out->first << " : " << out->second.count() << "\n";
                ++out;
            }
            else if (out == m_outboundTrace.end())  
            {
                ++o_counts[in->first];

                std::cout << in->first << " : " << in->second.count() << "\n";
                ++in;
            }
            else if (out->second < in->second)
            {
                ++o_counts[out->first];

                std::cout << "                  " << out->first << " : " << out->second.count() << "\n";
                ++out;
            }
            else
            {
                ++o_counts[in->first];

                std::cout << in->first << " : " << in->second.count() << "\n";
                ++in;
            }
        }
        std::cout << std::endl;
    }

    //////////////
    // Inbound

    void Listen(uint16_t i_portBase)
    {
        m_inboundSocket.reset(new asio::ip::tcp::socket(m_inboundStrand.get_io_service()));

        asio::error_code ec;
        if (m_listener.open(asio::ip::tcp::v4(), ec)
         || m_listener.bind(asio::ip::tcp::endpoint(asio::ip::tcp::v4(), i_portBase+m_n), ec)
         || m_listener.listen(-1, ec))
        {
            AbortIn(__LINE__); return;
        }

        TraceIn(__LINE__);

        m_listener.async_accept(*m_inboundSocket,
            m_inboundStrand.wrap([this](const asio::error_code& i_error)
        {
            OnInboundAccepted(i_error);
        }));
    }

    void OnInboundAccepted(const asio::error_code& i_error)
    {
        TraceIn(__LINE__);

        if (i_error) { AbortIn(__LINE__); return; }

        asio::async_read_until(*m_inboundSocket, m_inboundRxBuf, '\n',
            m_inboundStrand.wrap([this](const asio::error_code& i_err, size_t i_nRd)
        {
            OnInboundReadCompleted(i_err, i_nRd);
        }));
    }


    void OnInboundReadCompleted(const asio::error_code& i_error, size_t i_nRead)
    {
        TraceIn(__LINE__);

        if (i_error.value() != 0) { AbortIn(__LINE__); return; }
        if (bool(i_error)) { AbortIn(__LINE__); return; }
        if (i_nRead != 4) { AbortIn(__LINE__); return; }  // "msg\n"

        std::istream is(&m_inboundRxBuf);
        std::string s;
        if (!std::getline(is, s)) { AbortIn(__LINE__); return; }
        if (s != "msg") { AbortIn(__LINE__); return; }
        if (m_inboundRxBuf.in_avail() != 0) { AbortIn(__LINE__); return; }

        asio::async_read_until(*m_inboundSocket, m_inboundRxBuf, '\n',
            m_inboundStrand.wrap([this](const asio::error_code& i_err, size_t i_nRd)
        {
            OnInboundWaitCompleted(i_err, i_nRd);
        }));

    }

    void OnInboundWaitCompleted(const asio::error_code& i_error, size_t i_nRead)
    {
        TraceIn(__LINE__);

        if (i_error != asio::error::eof) { AbortIn(__LINE__); return; }
        if (i_nRead != 0) { AbortIn(__LINE__); return; }

        if (s_freeInboundSocket)
        {
            m_inboundSocket.reset();
        }
    }

    //////////////
    // Outbound

    void Connect(std::string i_host, uint16_t i_portBase)
    {
        asio::error_code ec;
        auto endpoint = m_resolver.resolve(asio::ip::tcp::resolver::query(i_host, std::to_string(i_portBase+m_n)), ec);
        if (ec) { AbortOut(__LINE__); return; }

        m_outboundSocket.reset(new asio::ip::tcp::socket(m_outboundStrand.get_io_service()));

        TraceOut(__LINE__);

        asio::async_connect(*m_outboundSocket, endpoint,
            m_outboundStrand.wrap([this](const std::error_code& i_error, const asio::ip::tcp::resolver::iterator& i_ep)
        {
            OnOutboundConnected(i_error, i_ep);
        }));
    }

    void OnOutboundConnected(const asio::error_code& i_error, const asio::ip::tcp::resolver::iterator& i_endpoint)
    {
        TraceOut(__LINE__);

        if (i_error) { AbortOut(__LINE__); return; }

        std::ostream(&m_outboundTxBuf) << "msg" << '\n';

        asio::async_write(*m_outboundSocket, m_outboundTxBuf.data(),
            m_outboundStrand.wrap([this](const asio::error_code& i_error, size_t i_nWritten)
        {
            OnOutboundWriteCompleted(i_error, i_nWritten);
        }));
    }

    void OnOutboundWriteCompleted(const asio::error_code& i_error, size_t i_nWritten)
    {
        TraceOut(__LINE__);

        if (i_error) { AbortOut(__LINE__); return; }
        if (i_nWritten != 4) { AbortOut(__LINE__); return; } // "msg\n"

        TraceOut(__LINE__);
        m_outboundSocket->shutdown(asio::socket_base::shutdown_both);

        asio::async_read_until(*m_outboundSocket, m_outboundRxBuf, '\n',
            m_outboundStrand.wrap([this](const asio::error_code& i_error, size_t i_nRead)
        {
            OnOutboundWaitCompleted(i_error, i_nRead);
        }));
    }

    void OnOutboundWaitCompleted(const asio::error_code& i_error, size_t i_nRead)
    {
        TraceOut(__LINE__);

        if (i_error != asio::error::eof) { AbortOut(__LINE__); return; }
        if (i_nRead != 0) { AbortOut(__LINE__); return; }

        if (s_freeOutboundSocket)
        {
            m_outboundSocket.reset();
        }
    }

private:
    //////////////
    // Inbound

    asio::io_service::strand m_inboundStrand;

    asio::ip::tcp::acceptor m_listener;
    std::unique_ptr<asio::ip::tcp::socket> m_inboundSocket;

    asio::streambuf m_inboundRxBuf;
    asio::streambuf m_inboundTxBuf;

    //////////////
    // Outbound

    asio::io_service::strand m_outboundStrand;

    asio::ip::tcp::resolver m_resolver;
    std::unique_ptr<asio::ip::tcp::socket> m_outboundSocket;

    asio::streambuf m_outboundRxBuf;
    asio::streambuf m_outboundTxBuf;

    //////////////
    // Common

    unsigned m_n;

    const std::chrono::high_resolution_clock::time_point m_traceStart;
    std::vector<std::pair<unsigned, std::chrono::nanoseconds>> m_inboundTrace;
    std::vector<std::pair<unsigned, std::chrono::nanoseconds>> m_outboundTrace;
};

static int Usage(int i_ret)
{
    std::cout << "[" << i_ret << "]" << "Usage: example <nThreads> <nConnections> <inboundFree> <outboundFree>" << std::endl;
    return i_ret;
}

int main(int argc, char* argv[])
{
    if (argc < 5)
        return Usage(__LINE__);

    const unsigned nThreads = unsigned(std::stoul(argv[1]));
    if (nThreads == 0)
        return Usage(__LINE__);
    const unsigned nConnections = unsigned(std::stoul(argv[2]));
    if (nConnections == 0)
        return Usage(__LINE__);

    s_freeInboundSocket = (*argv[3] == 'y');
    s_freeOutboundSocket = (*argv[4] == 'y');

    const uint16_t listenPortBase = 25000;
    const uint16_t connectPortBase = 25000;
    const std::string connectHost = "127.0.0.1";

    asio::io_service ioService;

    std::cout << "Creating." << std::endl;

    std::list<Tester> testers;

    for (unsigned i = 0; i < nConnections; ++i)
    {
        testers.emplace_back(ioService, i);
        testers.back().Listen(listenPortBase);
        testers.back().Connect(connectHost, connectPortBase);
    }

    std::cout << "Starting." << std::endl;

    std::vector<std::thread> threads;

    for (unsigned i = 0; i < nThreads; ++i)
    {   
        threads.emplace_back([&]()
        {
            ioService.run();
        });
    }

    std::cout << "Waiting." << std::endl;

    for (auto& thread : threads)
    {
        thread.join();
    }

    std::cout << "Stopped." << std::endl;

    return 0;
}

void DumpAllTraces(std::list<Tester>& i_testers)
{
    std::map<unsigned, unsigned> counts;

    for (auto& tester : i_testers)
    {
        tester.DumpTrace(counts);
    }   

    std::cout << "##############################\n";
    for (const auto& count : counts)
    {
        std::cout << count.first << " : " << count.second << "\n";
    }
    std::cout << std::endl;
}

#if defined(ASIO_NO_EXCEPTIONS)
namespace asio
{
        namespace detail
        {

                template <typename Exception>
                void throw_exception(const Exception& e)
                {
            abort();
                }

        } // namespace detail
} // namespace asio
#endif

我们编译如下(问题只出现在优化构建中):

g++ -o example -m64 -g -O3 --no-exceptions --no-rtti --std=c++11 -I asio-1.10.6/include -lpthread example.cpp

我们运行正在使用 Debian Jessie。 uname -a 报道 (Linux <hostname> 3.16.0-4-amd64 #1 SMP Debian 3.16.36-1+deb8u2 (2016-10-19) x86_64 GNU/Linux。 该问题同时出现在 GCC (g++ (Debian 4.9.2-10) 4.9.2) 和 Clang (Debian clang version 3.5.0-10 (tags/RELEASE_350/final) (based on LLVM 3.5.0)) 下。

[编辑添加:它也发生在 Debian Stretch Linux <hostname> 4.6.0-1-amd64 #1 SMP Debian 4.6.1-1 (2016-06-06) x86_64 GNU/Linuxg++ (Debian 6.2.1-5) 6.2.1 20161124 上。]

总之,测试应用执行以下操作:

如果我们指定所有套接字都保持原样,则测试循环 运行 会无限期地进行。为了避免因 SO_REUSEADDR 考虑而混淆水域,我们注意在开始测试之前没有套接字处于先前测试 运行 的 TIME_WAIT 状态,否则侦听可能会失败。但是在满足这个警告的情况下,测试应用程序 运行s 几乎是数百次,甚至数千次都没有错误。同样,如果我们指定应显式释放入站套接字(但不是出站套接字),所有 运行 都可以。

但是,如果我们指定应释放出站套接字,则应用程序会在执行可变次数后停止运行 - 有时执行十次或更少,有时一百次或更多,通常介于两者之间。

使用 GDB 连接到停滞的进程,我们看到主线程正在等待加入工作线程,除了一个工作线程外,所有工作线程都处于空闲状态(等待 Asio 内部条件变量),并且那个工作线程线程正在等待 Asio 对 epoll() 的调用。内部跟踪检测验证某些套接字正在等待异步操作完成 - 有时是初始(入站)接受,有时是(入站)数据读取,有时是通常以 [=21= 完成的最终入站或出站读取].

在所有情况下,连接的另一端已成功完成其工作:如果入站接受仍未决,我们会看到相应的出站连接已成功完成,同时出站写入也已成功完成;同样,如果入站数据读取处于挂起状态,则相应的出站连接和写入已完成;如果入站 EOF 读取处于挂起状态,则出站关闭已执行,同样,如果出站 EOF 读取处于挂起状态,则入站 EOF 读取已因出站关闭而完成。

检查进程的 /proc/N/fdinfo 表明 epoll 文件描述符确实在等待检测指示的文件描述符。

最令人费解的是,netstat 显示等待套接字的 RecvQ 大小为非零 - 也就是说,有待处理的读取操作的套接字显示为已准备好接收数据或关闭事件以供读取。这与我们的检测是一致的,因为它表明写入数据已传送到入站套接字,但尚未被读取(或者出站关闭已向入站端发出 FIN,但 EOF 尚未还 'read').

这让我怀疑 Asio 的 epoll 簿记 - 特别是它的边缘触发事件管理 - 由于竞争条件而在某处不同步。很明显这很有可能是我操作不当造成的,但是我看不出问题出在哪里。

我们将不胜感激所有见解、建议、已知问题和指出明显的错误。

[编辑添加:使用 strace 捕获内核调用会干扰执行,因此不会发生停顿。使用 sysdig 没有这种效果,但它目前不捕获 epoll_waitepoll_ctl 系统调用的参数。叹息。]

这似乎已由 ASIO 的维护者解决:

https://github.com/chriskohlhoff/asio/issues/180

https://github.com/chriskohlhoff/asio/commit/669e6b8b9de1309927b29d8b6be3630cc69c07ac