当我将客户端连接到服务器而不是阻塞和等待消息时,Recv() 返回 SOCKET_ERROR

Recv() returning SOCKET_ERROR when I connect a client to the server instead of blocking and waiting for message

我对 C++ 中的网络编程和多线程处理还比较陌生。目前我的 recv() 调用 returns 一个未知错误。我不太确定错误来自何处,希望能提供一些帮助。

我本地是用putty连接服务器的

class Threaded_TCPListener{

    int Threaded_TCPListener::Init()
    {
        // Initializing WinSock
        WSADATA wsData;
        WORD ver = MAKEWORD(2,2);

        int winSock = WSAStartup(ver, &wsData);
        if(winSock != 0)
            return winSock;

        // Creating listening socket
        this->socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

        if(this->socket == INVALID_SOCKET)
            return WSAGetLastError();

        // Fill sockaddr with ip addr and port
        sockaddr_in hint;
        hint.sin_family = AF_INET;
        hint.sin_port = htons(this->port);
        inet_pton(AF_INET, this->ipAddress, &hint.sin_addr);

        // Bind hint to socket
        if(bind(this->socket, (sockaddr*)&hint, sizeof(hint)) == SOCKET_ERROR)
            return WSAGetLastError();

        // Start listening on socket
        if(listen(this->socket, SOMAXCONN) == SOCKET_ERROR)
            return WSAGetLastError();

        // Accept first client
        this->createAcceptThread();

        return 0;
    }

    int Threaded_TCPListener::Run()
    {
        bool isRunning = true;

        // Read from all clients
        std::vector<std::thread> threads;
        threads.reserve(this->clients.size());

        // Recv from client sockets
        for (int i=0; i < this->clients.size(); ++i)
        {
            threads.emplace_back(std::thread(&Threaded_TCPListener::receiveFromSocket, this, socket));
        }

        // Wait for all threads to finish
        for(std::thread& t : threads)
        {
            t.detach();
        }

        return 0;
    }

    void Threaded_TCPListener::onMessageReceived(int clientSocket, const char* msg, int length)
    {
        Threaded_TCPListener::broadcastToClients(clientSocket, msg, length);

        std::thread t(&Threaded_TCPListener::receiveFromSocket, this, clientSocket);

        t.detach();

        return;
    }

    void Threaded_TCPListener::sendMessageToClient(int clientSocket, const char * msg, int length)
    {
        send(clientSocket, msg, length, 0);

        return;
    }

    void Threaded_TCPListener::broadcastToClients(int senderSocket, const char * msg, int length)
    {
        std::vector<std::thread> threads;
        threads.reserve(clients.size());

        // Iterate over all clients
        for (int sendSock : this->clients)
        {
            if(sendSock != senderSocket)
                threads.emplace_back(std::thread(&Threaded_TCPListener::sendMessageToClient, this,sendSock, msg, length));
        }

        // Wait for all threads to finish
        for(std::thread& t : threads)
            t.join();

        return;
    }

    void Threaded_TCPListener::createAcceptThread()
    {
        // Start accepting clients on a new thread
        this->listeningThread = std::thread(&Threaded_TCPListener::acceptClient, this);

        this->listeningThread.detach();

        return;
    }

    void Threaded_TCPListener::acceptClient()
    {
        int client = accept(this->socket, nullptr, nullptr);

        // Error
        if(client == INVALID_SOCKET)
        {
            std::printf("Accept Err: %d\n", WSAGetLastError());
        }
        // Add client to clients queue
        else
        {
            // Add client to queue
            this->clients.emplace(client);

            // Client Connect Confirmation
            onClientConnected(client); // Prints msg on server

            // Create another thread to accept more clients
            this->createAcceptThread();
        }

        return;
    }

    void Threaded_TCPListener::receiveFromSocket(int receivingSocket)
    {
        // Byte storage
        char buff[MAX_BUFF_SIZE];

        // Clear buff
        memset(buff, 0, sizeof(buff));

        // Receive msg
        int bytesRecvd = recv(receivingSocket, buff, MAX_BUFF_SIZE, 0);
        if(bytesRecvd <= 0)
        {
            char err_buff[1024];
            strerror_s(err_buff, bytesRecvd);

            std::cerr << err_buff;
            // Close client
            this->clients.erase(receivingSocket);
            closesocket(receivingSocket);

            onClientDisconnected(receivingSocket); // Prints msg on server
        }
        else
        {
            onMessageReceived(receivingSocket, buff, bytesRecvd);
        }
    }
}

我正在尝试创建一个多线程 TCP 'server',它将通过连续接受线程 运行(侦听新连接)和一个等待接收块的线程来处理传入的客户端每个连接到服务器的客户端。

代码中存在关于如何实现 TCP 服务器的误解:

您似乎假设您可以拥有一个可以处理所有通信的服务器套接字文件描述符。不是这种情况。您必须有一个专用的套接字文件描述符,仅用于侦听和接受传入连接,然后每个现有连接都有一个额外的文件描述符。

在您的代码中,我看到您总是使用侦听套接字调用 receiveFromSocket()。这是错误的。在循环中为所有客户端调用 receiveFromSocket() 也是错误的。

您更需要做的是: - 有一个在循环中调用 accept() 的专用线程。从多个线程调用 accept() 没有性能优势。 - 一个 accept() returns 一个新连接,您会生成一个新线程,该线程在循环中调用 recv()。然后这将阻止并等待您在问题中期望的新数据。

您还需要改掉从新线程调用单个函数的习惯。这不是多线程编程。一个线程通常包含一个循环。其他一切通常都是设计缺陷。

另请注意,多线程编程在 2019 年仍然是火箭科学,尤其是在 C++ 中。如果您不是绝对的专家,您将无法做到。另请注意,多线程编程的绝对专家将尽可能避免多线程编程。许多看似并发的 I/O 绑定的任务可以更好地由基于事件的单线程系统处理。

  1. 你的 Init 看起来不错:

    • 创建套接字,绑定它,监听它,启动接受线程
  2. 在您的接受线程中,acceptClient 看起来还不错:

    • 打印一些消息
    • 将客户端套接字添加到 clients 队列
    • 创建一个新的接受线程
  3. 你的Run没有意义:

    • clients 中的每个元素创建一个线程以从监听器接收 socket

看起来您正在为每个套接字操作生成一个新线程。这是一个相当浪费的设计。一旦线程完成,它就可以回去做其他事情。

因此在 acceptClient 中创建一个新的接受线程是一种浪费,您可以直接循环回到开头 ::accept 下一个客户端。像这样:

acceptClient() {
  while (alive) {
    int client = accept(socket, ...);
    createClientHandler(client);
  }
}

似乎缺少的是生成一个新的客户端线程来为客户端套接字提供服务。您目前在 Run 中执行此操作,但那是在实际接受任何客户端之前。而你是为了错误的套接字而做的!相反,您应该在 acceptClient 中生成 receiveFromSocket 线程,将其传递给 client 套接字。所以这是一个错误。

在您的 receiveFromSocket 中,您也不需要再次创建另一个线程 receiveFromSocket -- 只需循环回到开头。

这种每个操作线程设计的最大问题是您在每条传入消息上都生成发件人线程。这意味着您实际上可以有多个发送线程尝试在同一个 TCP 套接字上 ::send。那不是很安全。

The order of calls made to WSASend is also the order in which the buffers are transmitted to the transport layer. WSASend should not be called on the same stream-oriented socket concurrently from different threads, because some Winsock providers may split a large send request into multiple transmissions, and this may lead to unintended data interleaving from multiple concurrent send requests on the same stream-oriented socket.

https://docs.microsoft.com/en-us/windows/desktop/api/winsock2/nf-winsock2-wsasend

类似地,我建议您不要在 broadcastToClients 中生成线程,而是在 acceptClient 中为每个客户端套接字生成一个持久发送方线程(连同一些 [=] 中的 receiveFromSocket 线程32=]).

要与发送方线程通信,您应该使用线程安全的阻塞队列。每个发件人线程将如下所示:

while (alive) {
  msg = queue.next_message();
  send(client_socket, msg);
}

然后在收到消息后您只需执行以下操作:

for (client : clients) {
  client.queue.put_message(msg);
}

总而言之,要处理每个客户,您需要这样的结构:

struct Client {
  int client_socket;
  BlockingQueue queue;

  // optionally if you want to keep track of your threads
  // to be able to safely clean up
  std::thread recv_thread, send_thread;
};

安全清理完全是另一回事。


最后,在您的代码中对这条评论进行评论:

        // Wait for all threads to finish
        for(std::thread& t : threads)
        {
            t.detach();
        }

这几乎与 std::thread::detach 的作用相反: https://en.cppreference.com/w/cpp/thread/thread/detach 它允许您销毁线程对象而不必等待线程完成执行。