Boost::Asio io_context::run Unknown Visual C++ Runtime Error : Debug Error

Boost::Asio io_context::run Unknown Visual C++ Runtime Error : Debug Error

提升版本:1.68

C++ 标准:C++17

开发平台:MSVC 2017

操作系统:Windows10 Professional

PC 架构:x64

我正在使用 Boost::Asio 创建异步 TCP 连接。在第一次成功连接期间,一切正常。由于某些问题,它套接字中断,它尝试重新连接,那时我收到 运行-time 错误。即使我收到 运行 时间错误,程序仍然能够接收数据。

起初我试图在主函数的 while(无限)循环(主线程)中重新连接套接字,但出现错误。 我在

中遇到错误

D:\vcpkg\installed\x64-windows\include\boost\asio\detail\impl\win_iocp_io_context.ipp

size_t win_iocp_io_context::run(boost::system::error_code& ec)
{
  if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
  {
    stop();
    ec = boost::system::error_code();
    return 0;
  }

  win_iocp_thread_info this_thread;
  thread_call_stack::context ctx(this, this_thread);

  size_t n = 0;
  while (do_one(INFINITE, ec))
    if (n != (std::numeric_limits<size_t>::max)())
      ++n;
  return n;
}

当 n = 13 时在线

while (do_one(INFINITE, ec))

我通过在我的接收和发送处理程序中添加我的连接调用来解决错误,一旦他们检测到连接断开或套接字已关闭。

有人可以解释为什么我在尝试从主线程重新连接时遇到问题,而当我在 io_context 线程中的套接字损坏后立即尝试重新连接时问题得到解决。

在连接丢失和 io_context 线程退出后调用 运行 之前,我确实重启了上下文。

我的主要函数中的 While 循环:

while (true)
{
    fmt::print("Socket Alive : {}\n", as.isSocketAlive());
    while(not as.isSocketAlive() and not as.isConnectionInProcess())
        as.connectSocket();
    if (not as.isSocketAlive())
        continue;
    if (as.isReadComplete())
        as.sendDataSocket(as.getRecievedData());
}   

我的异步套接字函数:

void AsyncSocket::runContext(void)
{
    std::atomic_store(std::addressof(this->contextExitted), false);
    this->context.run();
    std::atomic_store(std::addressof(this->contextExitted), true);
}

void AsyncSocket::restartContext(void)
{
    if (std::atomic_load(std::addressof(this->contextExitted)))
        this->context.restart();
}

void AsyncSocket::connectSocket(void)
{
    std::atomic_store(std::addressof(this->socketAlive), false);
    std::atomic_store(std::addressof(this->connectionInProcess), true);
    //this->socket.async_connect(this->endpoint, std::bind(&AsyncSocket::connectHandler, this, boost::asio::placeholders::error));
    this->socket.async_connect(this->endpoint, boost::bind(&AsyncSocket::connectHandler, this, boost::asio::placeholders::error));
    //this->socket.connect(this->endpoint, );
    if (this->context.stopped())
    {
        if (this->isContextExitted())
            this->restartContext();
        this->t = std::thread(&AsyncSocket::runContext, this);
    }
    std::call_once(this->firstRun, [this]() {
        t = std::thread(&AsyncSocket::runContext, this);
    });
}

void AsyncSocket::recieveHandler(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
    fmt::print("In recieve handler, Error Code : {}\nBytes recieved : {}\n", ec.message(), bytes_transferred);
    try
    {
        std::atomic_store(std::addressof(this->receivingData), false);
        if (ec not_eq boost::system::errc::success)
        {
#ifdef _DEBUG
            fmt::print("Error in WebSocket::recieveHandler. Error : {0}\n", ec.message());
#endif // _DEBUG
            LOG_ERROR << ec.message();
            switch (ec.value())
            {
            case boost::asio::error::eof :
            case boost::asio::error::connection_reset :
            case boost::asio::error::connection_aborted :
            case boost::asio::error::network_reset :
            case boost::asio::error::network_down :
            case boost::asio::error::network_unreachable :
                if (this->isSocketAlive() and this->socket.is_open())
                    this->socket.close();
                std::atomic_store(std::addressof(this->socketAlive), false);
                this->connectSocket();// If I comment this line and try to reconnect in my main function (infinite while loop), I get mentioned run-time error
                return;
            default:
                break;
            }

        }
        else
        {
            this->readDataQueue.push(std::string(reinterpret_cast <const char *> (this->readDataBuffer.data()), bytes_transferred));
            std::atomic_store(std::addressof(this->readComplete), true);
        }
    }
    catch (const std::exception& ex)
    {
#ifdef _DEBUG
        fmt::print("Error in WebSocket::sendHandler. Error : {0}\n", ex.what());
#endif // _DEBUG
        LOG_ERROR << "Exception : " << ex.what() << "Data : " << this->writeDataQueue.front();
    }
    this->recieveDataSocket();
}



void AsyncSocket::sendHandler(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
    fmt::print("In send handler, Error Code : {}\nBytes recieved : {}\n", ec.message(), bytes_transferred);
    try
    {
        if (ec not_eq boost::system::errc::success)
        {
#ifdef _DEBUG
            fmt::print("Error in WebSocket::recieveHandler. Error : {0}\n", ec.message());
#endif // _DEBUG
            LOG_ERROR << ec.message();
            switch (ec.value())
            {
            case boost::asio::error::eof:
            case boost::asio::error::connection_reset:
            case boost::asio::error::connection_aborted:
            case boost::asio::error::network_reset:
            case boost::asio::error::network_down:
            case boost::asio::error::network_unreachable:
                if (this->isSocketAlive() and this->socket.is_open())
                    this->socket.close();
                std::atomic_store(std::addressof(this->socketAlive), false);
                this->connectSocket();// If I comment this line and try to reconnect in my main function (infinite while loop), I get mentioned run-time error
                return;
            default:
                break;
            }

        }
        this->writeDataQueue.pop();
        std::atomic_init(std::addressof(this->sendingData), false);
        this->writeSocket();
    }
    catch (const std::exception& ex)
    {
#ifdef _DEBUG
        fmt::print("Error in WebSocket::sendHandler. Error : {0}\n", ex.what());
#endif // _DEBUG
        LOG_ERROR << "Exception : " << ex.what() << "Data : " << this->writeDataQueue.front();
    }
}

abortstd::terminate 称为默认操作。 terminate 当一个线程被销毁或被 operator=(thread&&) 覆盖并且它处于 joinable 状态时被调用。

这段代码不安全:

connectSocket() {
...
if (this->context.stopped())
{
    if (this->isContextExitted())
        this->restartContext();
    this->t = std::thread(&AsyncSocket::runContext, this); // [1]
}
std::call_once(this->firstRun, [this]() {
    t = std::thread(&AsyncSocket::runContext, this); // [2]
});

第一次调用 connectSocket 行 [2] 被执行并启动线程。在第 N 次调用 connectSocket 时,如果 context.stopped() returns 为真,您正在创建新线程并将其分配给 t - 处于可连接状态的线程,此时 terminate 被调用。在移动分配之前,您应该加入 t 线程。