Linux 上的 Asio 在 epoll() 中停滞
Asio on Linux stalls in epoll()
我们在 Linux 上独立(非 Boost)Asio 1.10.6 的异步操作遇到问题,使用以下测试应用程序进行了演示:
#define ASIO_STANDALONE
#define ASIO_HEADER_ONLY
#define ASIO_NO_EXCEPTIONS
#define ASIO_NO_TYPEID
#include "asio.hpp"
#include <chrono>
#include <iostream>
#include <list>
#include <map>
#include <thread>
static bool s_freeInboundSocket = false;
static bool s_freeOutboundSocket = false;
class Tester
{
public:
Tester(asio::io_service& i_ioService, unsigned i_n)
: m_inboundStrand(i_ioService)
, m_listener(i_ioService)
, m_outboundStrand(i_ioService)
, m_resolver(i_ioService)
, m_n(i_n)
, m_traceStart(std::chrono::high_resolution_clock::now())
{}
~Tester()
{}
void TraceIn(unsigned i_line)
{
m_inboundTrace.emplace_back(i_line, std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - m_traceStart));
}
void AbortIn(unsigned i_line)
{
TraceIn(i_line);
abort();
}
void TraceOut(unsigned i_line)
{
m_outboundTrace.emplace_back(i_line, std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - m_traceStart));
}
void AbortOut(unsigned i_line)
{
TraceOut(i_line);
abort();
}
void DumpTrace(std::map<unsigned, unsigned>& o_counts)
{
std::cout << "## " << m_n << " ##\n";
std::cout << "-- " << m_traceStart.time_since_epoch().count() << "\n";
std::cout << "- in - - out -\n";
auto in = m_inboundTrace.begin();
auto out = m_outboundTrace.begin();
while ((in != m_inboundTrace.end()) || (out != m_outboundTrace.end()))
{
if (in == m_inboundTrace.end())
{
++o_counts[out->first];
std::cout << " " << out->first << " : " << out->second.count() << "\n";
++out;
}
else if (out == m_outboundTrace.end())
{
++o_counts[in->first];
std::cout << in->first << " : " << in->second.count() << "\n";
++in;
}
else if (out->second < in->second)
{
++o_counts[out->first];
std::cout << " " << out->first << " : " << out->second.count() << "\n";
++out;
}
else
{
++o_counts[in->first];
std::cout << in->first << " : " << in->second.count() << "\n";
++in;
}
}
std::cout << std::endl;
}
//////////////
// Inbound
void Listen(uint16_t i_portBase)
{
m_inboundSocket.reset(new asio::ip::tcp::socket(m_inboundStrand.get_io_service()));
asio::error_code ec;
if (m_listener.open(asio::ip::tcp::v4(), ec)
|| m_listener.bind(asio::ip::tcp::endpoint(asio::ip::tcp::v4(), i_portBase+m_n), ec)
|| m_listener.listen(-1, ec))
{
AbortIn(__LINE__); return;
}
TraceIn(__LINE__);
m_listener.async_accept(*m_inboundSocket,
m_inboundStrand.wrap([this](const asio::error_code& i_error)
{
OnInboundAccepted(i_error);
}));
}
void OnInboundAccepted(const asio::error_code& i_error)
{
TraceIn(__LINE__);
if (i_error) { AbortIn(__LINE__); return; }
asio::async_read_until(*m_inboundSocket, m_inboundRxBuf, '\n',
m_inboundStrand.wrap([this](const asio::error_code& i_err, size_t i_nRd)
{
OnInboundReadCompleted(i_err, i_nRd);
}));
}
void OnInboundReadCompleted(const asio::error_code& i_error, size_t i_nRead)
{
TraceIn(__LINE__);
if (i_error.value() != 0) { AbortIn(__LINE__); return; }
if (bool(i_error)) { AbortIn(__LINE__); return; }
if (i_nRead != 4) { AbortIn(__LINE__); return; } // "msg\n"
std::istream is(&m_inboundRxBuf);
std::string s;
if (!std::getline(is, s)) { AbortIn(__LINE__); return; }
if (s != "msg") { AbortIn(__LINE__); return; }
if (m_inboundRxBuf.in_avail() != 0) { AbortIn(__LINE__); return; }
asio::async_read_until(*m_inboundSocket, m_inboundRxBuf, '\n',
m_inboundStrand.wrap([this](const asio::error_code& i_err, size_t i_nRd)
{
OnInboundWaitCompleted(i_err, i_nRd);
}));
}
void OnInboundWaitCompleted(const asio::error_code& i_error, size_t i_nRead)
{
TraceIn(__LINE__);
if (i_error != asio::error::eof) { AbortIn(__LINE__); return; }
if (i_nRead != 0) { AbortIn(__LINE__); return; }
if (s_freeInboundSocket)
{
m_inboundSocket.reset();
}
}
//////////////
// Outbound
void Connect(std::string i_host, uint16_t i_portBase)
{
asio::error_code ec;
auto endpoint = m_resolver.resolve(asio::ip::tcp::resolver::query(i_host, std::to_string(i_portBase+m_n)), ec);
if (ec) { AbortOut(__LINE__); return; }
m_outboundSocket.reset(new asio::ip::tcp::socket(m_outboundStrand.get_io_service()));
TraceOut(__LINE__);
asio::async_connect(*m_outboundSocket, endpoint,
m_outboundStrand.wrap([this](const std::error_code& i_error, const asio::ip::tcp::resolver::iterator& i_ep)
{
OnOutboundConnected(i_error, i_ep);
}));
}
void OnOutboundConnected(const asio::error_code& i_error, const asio::ip::tcp::resolver::iterator& i_endpoint)
{
TraceOut(__LINE__);
if (i_error) { AbortOut(__LINE__); return; }
std::ostream(&m_outboundTxBuf) << "msg" << '\n';
asio::async_write(*m_outboundSocket, m_outboundTxBuf.data(),
m_outboundStrand.wrap([this](const asio::error_code& i_error, size_t i_nWritten)
{
OnOutboundWriteCompleted(i_error, i_nWritten);
}));
}
void OnOutboundWriteCompleted(const asio::error_code& i_error, size_t i_nWritten)
{
TraceOut(__LINE__);
if (i_error) { AbortOut(__LINE__); return; }
if (i_nWritten != 4) { AbortOut(__LINE__); return; } // "msg\n"
TraceOut(__LINE__);
m_outboundSocket->shutdown(asio::socket_base::shutdown_both);
asio::async_read_until(*m_outboundSocket, m_outboundRxBuf, '\n',
m_outboundStrand.wrap([this](const asio::error_code& i_error, size_t i_nRead)
{
OnOutboundWaitCompleted(i_error, i_nRead);
}));
}
void OnOutboundWaitCompleted(const asio::error_code& i_error, size_t i_nRead)
{
TraceOut(__LINE__);
if (i_error != asio::error::eof) { AbortOut(__LINE__); return; }
if (i_nRead != 0) { AbortOut(__LINE__); return; }
if (s_freeOutboundSocket)
{
m_outboundSocket.reset();
}
}
private:
//////////////
// Inbound
asio::io_service::strand m_inboundStrand;
asio::ip::tcp::acceptor m_listener;
std::unique_ptr<asio::ip::tcp::socket> m_inboundSocket;
asio::streambuf m_inboundRxBuf;
asio::streambuf m_inboundTxBuf;
//////////////
// Outbound
asio::io_service::strand m_outboundStrand;
asio::ip::tcp::resolver m_resolver;
std::unique_ptr<asio::ip::tcp::socket> m_outboundSocket;
asio::streambuf m_outboundRxBuf;
asio::streambuf m_outboundTxBuf;
//////////////
// Common
unsigned m_n;
const std::chrono::high_resolution_clock::time_point m_traceStart;
std::vector<std::pair<unsigned, std::chrono::nanoseconds>> m_inboundTrace;
std::vector<std::pair<unsigned, std::chrono::nanoseconds>> m_outboundTrace;
};
static int Usage(int i_ret)
{
std::cout << "[" << i_ret << "]" << "Usage: example <nThreads> <nConnections> <inboundFree> <outboundFree>" << std::endl;
return i_ret;
}
int main(int argc, char* argv[])
{
if (argc < 5)
return Usage(__LINE__);
const unsigned nThreads = unsigned(std::stoul(argv[1]));
if (nThreads == 0)
return Usage(__LINE__);
const unsigned nConnections = unsigned(std::stoul(argv[2]));
if (nConnections == 0)
return Usage(__LINE__);
s_freeInboundSocket = (*argv[3] == 'y');
s_freeOutboundSocket = (*argv[4] == 'y');
const uint16_t listenPortBase = 25000;
const uint16_t connectPortBase = 25000;
const std::string connectHost = "127.0.0.1";
asio::io_service ioService;
std::cout << "Creating." << std::endl;
std::list<Tester> testers;
for (unsigned i = 0; i < nConnections; ++i)
{
testers.emplace_back(ioService, i);
testers.back().Listen(listenPortBase);
testers.back().Connect(connectHost, connectPortBase);
}
std::cout << "Starting." << std::endl;
std::vector<std::thread> threads;
for (unsigned i = 0; i < nThreads; ++i)
{
threads.emplace_back([&]()
{
ioService.run();
});
}
std::cout << "Waiting." << std::endl;
for (auto& thread : threads)
{
thread.join();
}
std::cout << "Stopped." << std::endl;
return 0;
}
void DumpAllTraces(std::list<Tester>& i_testers)
{
std::map<unsigned, unsigned> counts;
for (auto& tester : i_testers)
{
tester.DumpTrace(counts);
}
std::cout << "##############################\n";
for (const auto& count : counts)
{
std::cout << count.first << " : " << count.second << "\n";
}
std::cout << std::endl;
}
#if defined(ASIO_NO_EXCEPTIONS)
namespace asio
{
namespace detail
{
template <typename Exception>
void throw_exception(const Exception& e)
{
abort();
}
} // namespace detail
} // namespace asio
#endif
我们编译如下(问题只出现在优化构建中):
g++ -o example -m64 -g -O3 --no-exceptions --no-rtti --std=c++11 -I asio-1.10.6/include -lpthread example.cpp
我们运行正在使用 Debian Jessie。 uname -a
报道 (Linux <hostname> 3.16.0-4-amd64 #1 SMP Debian 3.16.36-1+deb8u2 (2016-10-19) x86_64 GNU/Linux
。
该问题同时出现在 GCC (g++ (Debian 4.9.2-10) 4.9.2
) 和 Clang (Debian clang version 3.5.0-10 (tags/RELEASE_350/final) (based on LLVM 3.5.0)
) 下。
[编辑添加:它也发生在 Debian Stretch Linux <hostname> 4.6.0-1-amd64 #1 SMP Debian 4.6.1-1 (2016-06-06) x86_64 GNU/Linux
和 g++ (Debian 6.2.1-5) 6.2.1 20161124
上。]
总之,测试应用执行以下操作:
我们创建N个连接,每个连接包含一个入站(监听)
端和出站(连接)端。每个入站监听器都被绑定
到一个唯一的端口(从 25000 开始),以及每个出站连接器
使用系统选择的始发端口。
入站端执行async_accept
,并在
完成发出 async_read
。当读取完成时,它会发出
另一个 async_read 我们期望 return eof
。当那个
完成后,我们要么立即释放套接字,要么保持原样
(没有挂起的异步操作)由相关的清理
程序退出时的析构函数。 (注意监听套接字是
始终保持原样,没有等待接受,直到退出。)
出站端执行一次async_connect
,关于完成问题
一个 async_write
。写入完成后,它会发出 shutdown
(具体来说,shutdown(both)
)后跟 async_read
我们
预计 return eof
。完成后,我们再次要么离开
socket 原样,没有挂起的操作,或者我们立即释放它。
任何错误或意外接收数据都会导致立即 abort()
打电话。
测试应用让我们为
io_service
,以及要创建的连接总数,如
以及控制入站和出站套接字的标志
分别被释放或保持原样。
我们运行反复测试应用,指定50个线程和1000个
连接。
即while ./example 50 1000 n y >out.txt ; do echo -n . ; done
如果我们指定所有套接字都保持原样,则测试循环 运行 会无限期地进行。为了避免因 SO_REUSEADDR
考虑而混淆水域,我们注意在开始测试之前没有套接字处于先前测试 运行 的 TIME_WAIT
状态,否则侦听可能会失败。但是在满足这个警告的情况下,测试应用程序 运行s 几乎是数百次,甚至数千次都没有错误。同样,如果我们指定应显式释放入站套接字(但不是出站套接字),所有 运行 都可以。
但是,如果我们指定应释放出站套接字,则应用程序会在执行可变次数后停止运行 - 有时执行十次或更少,有时一百次或更多,通常介于两者之间。
使用 GDB 连接到停滞的进程,我们看到主线程正在等待加入工作线程,除了一个工作线程外,所有工作线程都处于空闲状态(等待 Asio 内部条件变量),并且那个工作线程线程正在等待 Asio 对 epoll()
的调用。内部跟踪检测验证某些套接字正在等待异步操作完成 - 有时是初始(入站)接受,有时是(入站)数据读取,有时是通常以 [=21= 完成的最终入站或出站读取].
在所有情况下,连接的另一端已成功完成其工作:如果入站接受仍未决,我们会看到相应的出站连接已成功完成,同时出站写入也已成功完成;同样,如果入站数据读取处于挂起状态,则相应的出站连接和写入已完成;如果入站 EOF 读取处于挂起状态,则出站关闭已执行,同样,如果出站 EOF 读取处于挂起状态,则入站 EOF 读取已因出站关闭而完成。
检查进程的 /proc/N/fdinfo 表明 epoll 文件描述符确实在等待检测指示的文件描述符。
最令人费解的是,netstat
显示等待套接字的 RecvQ 大小为非零 - 也就是说,有待处理的读取操作的套接字显示为已准备好接收数据或关闭事件以供读取。这与我们的检测是一致的,因为它表明写入数据已传送到入站套接字,但尚未被读取(或者出站关闭已向入站端发出 FIN,但 EOF 尚未还 'read').
这让我怀疑 Asio 的 epoll
簿记 - 特别是它的边缘触发事件管理 - 由于竞争条件而在某处不同步。很明显这很有可能是我操作不当造成的,但是我看不出问题出在哪里。
我们将不胜感激所有见解、建议、已知问题和指出明显的错误。
[编辑添加:使用 strace
捕获内核调用会干扰执行,因此不会发生停顿。使用 sysdig
没有这种效果,但它目前不捕获 epoll_wait
和 epoll_ctl
系统调用的参数。叹息。]
我们在 Linux 上独立(非 Boost)Asio 1.10.6 的异步操作遇到问题,使用以下测试应用程序进行了演示:
#define ASIO_STANDALONE
#define ASIO_HEADER_ONLY
#define ASIO_NO_EXCEPTIONS
#define ASIO_NO_TYPEID
#include "asio.hpp"
#include <chrono>
#include <iostream>
#include <list>
#include <map>
#include <thread>
static bool s_freeInboundSocket = false;
static bool s_freeOutboundSocket = false;
class Tester
{
public:
Tester(asio::io_service& i_ioService, unsigned i_n)
: m_inboundStrand(i_ioService)
, m_listener(i_ioService)
, m_outboundStrand(i_ioService)
, m_resolver(i_ioService)
, m_n(i_n)
, m_traceStart(std::chrono::high_resolution_clock::now())
{}
~Tester()
{}
void TraceIn(unsigned i_line)
{
m_inboundTrace.emplace_back(i_line, std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - m_traceStart));
}
void AbortIn(unsigned i_line)
{
TraceIn(i_line);
abort();
}
void TraceOut(unsigned i_line)
{
m_outboundTrace.emplace_back(i_line, std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - m_traceStart));
}
void AbortOut(unsigned i_line)
{
TraceOut(i_line);
abort();
}
void DumpTrace(std::map<unsigned, unsigned>& o_counts)
{
std::cout << "## " << m_n << " ##\n";
std::cout << "-- " << m_traceStart.time_since_epoch().count() << "\n";
std::cout << "- in - - out -\n";
auto in = m_inboundTrace.begin();
auto out = m_outboundTrace.begin();
while ((in != m_inboundTrace.end()) || (out != m_outboundTrace.end()))
{
if (in == m_inboundTrace.end())
{
++o_counts[out->first];
std::cout << " " << out->first << " : " << out->second.count() << "\n";
++out;
}
else if (out == m_outboundTrace.end())
{
++o_counts[in->first];
std::cout << in->first << " : " << in->second.count() << "\n";
++in;
}
else if (out->second < in->second)
{
++o_counts[out->first];
std::cout << " " << out->first << " : " << out->second.count() << "\n";
++out;
}
else
{
++o_counts[in->first];
std::cout << in->first << " : " << in->second.count() << "\n";
++in;
}
}
std::cout << std::endl;
}
//////////////
// Inbound
void Listen(uint16_t i_portBase)
{
m_inboundSocket.reset(new asio::ip::tcp::socket(m_inboundStrand.get_io_service()));
asio::error_code ec;
if (m_listener.open(asio::ip::tcp::v4(), ec)
|| m_listener.bind(asio::ip::tcp::endpoint(asio::ip::tcp::v4(), i_portBase+m_n), ec)
|| m_listener.listen(-1, ec))
{
AbortIn(__LINE__); return;
}
TraceIn(__LINE__);
m_listener.async_accept(*m_inboundSocket,
m_inboundStrand.wrap([this](const asio::error_code& i_error)
{
OnInboundAccepted(i_error);
}));
}
void OnInboundAccepted(const asio::error_code& i_error)
{
TraceIn(__LINE__);
if (i_error) { AbortIn(__LINE__); return; }
asio::async_read_until(*m_inboundSocket, m_inboundRxBuf, '\n',
m_inboundStrand.wrap([this](const asio::error_code& i_err, size_t i_nRd)
{
OnInboundReadCompleted(i_err, i_nRd);
}));
}
void OnInboundReadCompleted(const asio::error_code& i_error, size_t i_nRead)
{
TraceIn(__LINE__);
if (i_error.value() != 0) { AbortIn(__LINE__); return; }
if (bool(i_error)) { AbortIn(__LINE__); return; }
if (i_nRead != 4) { AbortIn(__LINE__); return; } // "msg\n"
std::istream is(&m_inboundRxBuf);
std::string s;
if (!std::getline(is, s)) { AbortIn(__LINE__); return; }
if (s != "msg") { AbortIn(__LINE__); return; }
if (m_inboundRxBuf.in_avail() != 0) { AbortIn(__LINE__); return; }
asio::async_read_until(*m_inboundSocket, m_inboundRxBuf, '\n',
m_inboundStrand.wrap([this](const asio::error_code& i_err, size_t i_nRd)
{
OnInboundWaitCompleted(i_err, i_nRd);
}));
}
void OnInboundWaitCompleted(const asio::error_code& i_error, size_t i_nRead)
{
TraceIn(__LINE__);
if (i_error != asio::error::eof) { AbortIn(__LINE__); return; }
if (i_nRead != 0) { AbortIn(__LINE__); return; }
if (s_freeInboundSocket)
{
m_inboundSocket.reset();
}
}
//////////////
// Outbound
void Connect(std::string i_host, uint16_t i_portBase)
{
asio::error_code ec;
auto endpoint = m_resolver.resolve(asio::ip::tcp::resolver::query(i_host, std::to_string(i_portBase+m_n)), ec);
if (ec) { AbortOut(__LINE__); return; }
m_outboundSocket.reset(new asio::ip::tcp::socket(m_outboundStrand.get_io_service()));
TraceOut(__LINE__);
asio::async_connect(*m_outboundSocket, endpoint,
m_outboundStrand.wrap([this](const std::error_code& i_error, const asio::ip::tcp::resolver::iterator& i_ep)
{
OnOutboundConnected(i_error, i_ep);
}));
}
void OnOutboundConnected(const asio::error_code& i_error, const asio::ip::tcp::resolver::iterator& i_endpoint)
{
TraceOut(__LINE__);
if (i_error) { AbortOut(__LINE__); return; }
std::ostream(&m_outboundTxBuf) << "msg" << '\n';
asio::async_write(*m_outboundSocket, m_outboundTxBuf.data(),
m_outboundStrand.wrap([this](const asio::error_code& i_error, size_t i_nWritten)
{
OnOutboundWriteCompleted(i_error, i_nWritten);
}));
}
void OnOutboundWriteCompleted(const asio::error_code& i_error, size_t i_nWritten)
{
TraceOut(__LINE__);
if (i_error) { AbortOut(__LINE__); return; }
if (i_nWritten != 4) { AbortOut(__LINE__); return; } // "msg\n"
TraceOut(__LINE__);
m_outboundSocket->shutdown(asio::socket_base::shutdown_both);
asio::async_read_until(*m_outboundSocket, m_outboundRxBuf, '\n',
m_outboundStrand.wrap([this](const asio::error_code& i_error, size_t i_nRead)
{
OnOutboundWaitCompleted(i_error, i_nRead);
}));
}
void OnOutboundWaitCompleted(const asio::error_code& i_error, size_t i_nRead)
{
TraceOut(__LINE__);
if (i_error != asio::error::eof) { AbortOut(__LINE__); return; }
if (i_nRead != 0) { AbortOut(__LINE__); return; }
if (s_freeOutboundSocket)
{
m_outboundSocket.reset();
}
}
private:
//////////////
// Inbound
asio::io_service::strand m_inboundStrand;
asio::ip::tcp::acceptor m_listener;
std::unique_ptr<asio::ip::tcp::socket> m_inboundSocket;
asio::streambuf m_inboundRxBuf;
asio::streambuf m_inboundTxBuf;
//////////////
// Outbound
asio::io_service::strand m_outboundStrand;
asio::ip::tcp::resolver m_resolver;
std::unique_ptr<asio::ip::tcp::socket> m_outboundSocket;
asio::streambuf m_outboundRxBuf;
asio::streambuf m_outboundTxBuf;
//////////////
// Common
unsigned m_n;
const std::chrono::high_resolution_clock::time_point m_traceStart;
std::vector<std::pair<unsigned, std::chrono::nanoseconds>> m_inboundTrace;
std::vector<std::pair<unsigned, std::chrono::nanoseconds>> m_outboundTrace;
};
static int Usage(int i_ret)
{
std::cout << "[" << i_ret << "]" << "Usage: example <nThreads> <nConnections> <inboundFree> <outboundFree>" << std::endl;
return i_ret;
}
int main(int argc, char* argv[])
{
if (argc < 5)
return Usage(__LINE__);
const unsigned nThreads = unsigned(std::stoul(argv[1]));
if (nThreads == 0)
return Usage(__LINE__);
const unsigned nConnections = unsigned(std::stoul(argv[2]));
if (nConnections == 0)
return Usage(__LINE__);
s_freeInboundSocket = (*argv[3] == 'y');
s_freeOutboundSocket = (*argv[4] == 'y');
const uint16_t listenPortBase = 25000;
const uint16_t connectPortBase = 25000;
const std::string connectHost = "127.0.0.1";
asio::io_service ioService;
std::cout << "Creating." << std::endl;
std::list<Tester> testers;
for (unsigned i = 0; i < nConnections; ++i)
{
testers.emplace_back(ioService, i);
testers.back().Listen(listenPortBase);
testers.back().Connect(connectHost, connectPortBase);
}
std::cout << "Starting." << std::endl;
std::vector<std::thread> threads;
for (unsigned i = 0; i < nThreads; ++i)
{
threads.emplace_back([&]()
{
ioService.run();
});
}
std::cout << "Waiting." << std::endl;
for (auto& thread : threads)
{
thread.join();
}
std::cout << "Stopped." << std::endl;
return 0;
}
void DumpAllTraces(std::list<Tester>& i_testers)
{
std::map<unsigned, unsigned> counts;
for (auto& tester : i_testers)
{
tester.DumpTrace(counts);
}
std::cout << "##############################\n";
for (const auto& count : counts)
{
std::cout << count.first << " : " << count.second << "\n";
}
std::cout << std::endl;
}
#if defined(ASIO_NO_EXCEPTIONS)
namespace asio
{
namespace detail
{
template <typename Exception>
void throw_exception(const Exception& e)
{
abort();
}
} // namespace detail
} // namespace asio
#endif
我们编译如下(问题只出现在优化构建中):
g++ -o example -m64 -g -O3 --no-exceptions --no-rtti --std=c++11 -I asio-1.10.6/include -lpthread example.cpp
我们运行正在使用 Debian Jessie。 uname -a
报道 (Linux <hostname> 3.16.0-4-amd64 #1 SMP Debian 3.16.36-1+deb8u2 (2016-10-19) x86_64 GNU/Linux
。
该问题同时出现在 GCC (g++ (Debian 4.9.2-10) 4.9.2
) 和 Clang (Debian clang version 3.5.0-10 (tags/RELEASE_350/final) (based on LLVM 3.5.0)
) 下。
[编辑添加:它也发生在 Debian Stretch Linux <hostname> 4.6.0-1-amd64 #1 SMP Debian 4.6.1-1 (2016-06-06) x86_64 GNU/Linux
和 g++ (Debian 6.2.1-5) 6.2.1 20161124
上。]
总之,测试应用执行以下操作:
我们创建N个连接,每个连接包含一个入站(监听) 端和出站(连接)端。每个入站监听器都被绑定 到一个唯一的端口(从 25000 开始),以及每个出站连接器 使用系统选择的始发端口。
入站端执行
async_accept
,并在 完成发出async_read
。当读取完成时,它会发出 另一个 async_read 我们期望 returneof
。当那个 完成后,我们要么立即释放套接字,要么保持原样 (没有挂起的异步操作)由相关的清理 程序退出时的析构函数。 (注意监听套接字是 始终保持原样,没有等待接受,直到退出。)出站端执行一次
async_connect
,关于完成问题 一个async_write
。写入完成后,它会发出shutdown
(具体来说,shutdown(both)
)后跟async_read
我们 预计 returneof
。完成后,我们再次要么离开 socket 原样,没有挂起的操作,或者我们立即释放它。任何错误或意外接收数据都会导致立即
abort()
打电话。测试应用让我们为
io_service
,以及要创建的连接总数,如 以及控制入站和出站套接字的标志 分别被释放或保持原样。我们运行反复测试应用,指定50个线程和1000个 连接。
即
while ./example 50 1000 n y >out.txt ; do echo -n . ; done
如果我们指定所有套接字都保持原样,则测试循环 运行 会无限期地进行。为了避免因 SO_REUSEADDR
考虑而混淆水域,我们注意在开始测试之前没有套接字处于先前测试 运行 的 TIME_WAIT
状态,否则侦听可能会失败。但是在满足这个警告的情况下,测试应用程序 运行s 几乎是数百次,甚至数千次都没有错误。同样,如果我们指定应显式释放入站套接字(但不是出站套接字),所有 运行 都可以。
但是,如果我们指定应释放出站套接字,则应用程序会在执行可变次数后停止运行 - 有时执行十次或更少,有时一百次或更多,通常介于两者之间。
使用 GDB 连接到停滞的进程,我们看到主线程正在等待加入工作线程,除了一个工作线程外,所有工作线程都处于空闲状态(等待 Asio 内部条件变量),并且那个工作线程线程正在等待 Asio 对 epoll()
的调用。内部跟踪检测验证某些套接字正在等待异步操作完成 - 有时是初始(入站)接受,有时是(入站)数据读取,有时是通常以 [=21= 完成的最终入站或出站读取].
在所有情况下,连接的另一端已成功完成其工作:如果入站接受仍未决,我们会看到相应的出站连接已成功完成,同时出站写入也已成功完成;同样,如果入站数据读取处于挂起状态,则相应的出站连接和写入已完成;如果入站 EOF 读取处于挂起状态,则出站关闭已执行,同样,如果出站 EOF 读取处于挂起状态,则入站 EOF 读取已因出站关闭而完成。
检查进程的 /proc/N/fdinfo 表明 epoll 文件描述符确实在等待检测指示的文件描述符。
最令人费解的是,netstat
显示等待套接字的 RecvQ 大小为非零 - 也就是说,有待处理的读取操作的套接字显示为已准备好接收数据或关闭事件以供读取。这与我们的检测是一致的,因为它表明写入数据已传送到入站套接字,但尚未被读取(或者出站关闭已向入站端发出 FIN,但 EOF 尚未还 'read').
这让我怀疑 Asio 的 epoll
簿记 - 特别是它的边缘触发事件管理 - 由于竞争条件而在某处不同步。很明显这很有可能是我操作不当造成的,但是我看不出问题出在哪里。
我们将不胜感激所有见解、建议、已知问题和指出明显的错误。
[编辑添加:使用 strace
捕获内核调用会干扰执行,因此不会发生停顿。使用 sysdig
没有这种效果,但它目前不捕获 epoll_wait
和 epoll_ctl
系统调用的参数。叹息。]