Linux TCP 套接字:客户端已发送数据但服务器仍阻塞读取()

Linux TCP socket: client has sent data but server still blocks on read()

我在 Linux 上有一个使用 TCP 套接字的简单客户端-服务器示例。服务器监听环回地址。客户端连接到服务器并发送一些整数和一个“END”字符串来标记数据的结束。服务器读取数字,将它们全部相加并 returns 总和。但是,即使客户端已成功发送所有数据,我的服务器有时也会阻塞 read()

代码如下:

server.c:

#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define BACKLOG 5

int main(int argc, char *argv[]) {
    struct sockaddr_in addr;
    int down_flag = 0;
    int result = 0;
    int ret = 0;

    int sfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sfd < 0) {
        perror("Create server socket error: %s\n");
        return 0;
    }

    /* Bind socket to loopback address */
    memset((void *) &addr, 0, sizeof(struct sockaddr_in));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(8888);
    addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
    if (bind(sfd, (struct sockaddr *) &addr, sizeof(struct sockaddr_in)) == -1) {
        perror("Bind server socket failed");
        goto _exit;
    }

    if (listen(sfd, BACKLOG) == -1) {
        perror("Listen failed");
        goto _exit;
    }

    ssize_t num_rd = 0;
    char buf[100] = {0};
    for (;;)
    {
        printf("Waiting to accept a connection...\n");
        int cfd = accept(sfd, NULL, NULL);
        printf("Accepted socket fd = %d\n", cfd);
        result = 0;
        while ((num_rd = read(cfd, buf, sizeof(buf))) > 0) {
            /* Ensure the buffer is 0-terminated */
            buf[sizeof(buf) - 1] = 0;
            printf("Read data: %s\n", buf);

            /* Handle commands */
            if (!strncmp(buf, "DOWN", sizeof(buf))) {
                down_flag = 1;
                break;
            }
            if (!strncmp(buf, "END", sizeof(buf))) {
                break;
            }
            /* Add received summand */
            result += atoi(buf);
        }
        if (-1 == num_rd) {
            perror("Read error");
        }

        /* Send result */
        sprintf(buf, "%d", result);
        ret = write(cfd, buf, sizeof(buf));
        if (-1 == ret) {
            perror("Write error\n");
            goto _exit;
        }
        close(cfd);
        /* Quit on DOWN command */
        if (down_flag) {
            break;
        }
    }
_exit:
    close(sfd);
    return 0;
}

client.c:

#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>

int main(int argc, char *argv[]) {
    struct sockaddr_in addr;
    int ret;
    int data_socket;
    char buf[100] = {0};
    int i = 0;

    data_socket = socket(AF_INET, SOCK_STREAM, 0);
    if (-1 == data_socket) {
        perror("Create client socket error");
        exit(EXIT_FAILURE);
    }

    /* Connect to server socket */
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(8888);
    addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
    ret = connect(data_socket, (const struct sockaddr *) &addr, sizeof(addr));
    if (-1 == ret) {
        perror("Connect error");
        exit(EXIT_FAILURE);
    }

    /* Send arguments */
    for (i = 1; i < argc; i++) {
        ret = write(data_socket, argv[i], strlen(argv[i]) + 1);
        if (-1 == ret) {
            perror("Write error");
            break;
        }
    }
    strcpy(buf, "END");
    ret = write(data_socket, buf, strlen(buf) + 1);
    printf("write %s to socket, ret = %d\n", buf, ret);
    if (-1 == ret) {
        perror("Write to socket error");
        exit(EXIT_FAILURE);
    }
    /* Read the result */
    memset(buf, 0, sizeof(buf));
    ret = read(data_socket, buf, sizeof(buf));
    if (-1 == ret) {
        perror("Read from client socket error");
        exit(EXIT_FAILURE);
    }
    buf[sizeof(buf) - 1] = 0;
    printf("Result = %s\n", buf);
    close(data_socket);
    exit(EXIT_SUCCESS);
}

运行 客户端几次,服务器将在某些时候阻塞 read() 调用:

$ for i in {1..100}; do ./client 3 4 5 6; done
write END to socket, ret = 4
Result = 18
write END to socket, ret = 4

服务器输出:

$ ./server
Waiting to accept a connection...
Accepted socket fd = 4
Read data: 3
Read data: 4
Read data: 5
Read data: 6
Read data: END
Waiting to accept a connection...
Accepted socket fd = 4
Read data: 3

服务器在 while ((num_rd = read(cfd, buf, sizeof(buf))) > 0) 行阻塞。

编辑:我的问题是为什么 read() 块。 AFAIK,read() 将阻塞,直到从套接字中读取至少 1 个字节的数据。在这种情况下,客户端发送的数据多于服务器读取的数据,因此我认为可以从套接字读取可用数据。那么为什么 read() 仍然阻塞?

问题的核心是代码测试缓冲区中的第一条消息,而忽略了同一缓冲区可能包含多条消息、部分消息或任何其他组合的可能性(参见 edit).因此,消息 END 有时会被忽略,并且 read 循环从未终止。

该代码假定 single read 将准确接收 single write 调用发送的内容.

这是非常不准确的,很少是真的,而且可能只有在客户端和服务器都在同一台机器上的时候才有效。

单个 read 可能会同时读取 2 个 write 调用,或者它可能会读取半个 write 调用,然后再读取另外 1.5 个 write 调用...

TCP/IP(不像 UDP) is a 并且不知道消息边界。


编辑:

为了澄清(按照评论中的要求),假设调用 read 收集以下数据 "1234[=19=]EN"(下一个 read 将收集 "D[=21=]")。 .. 程序是做什么的?

还有一种可能的情况是writes一口气读完。即 buf 包含字符串 "3[=24=]END[=24=]".

此时循环内发生了什么?

在此示例场景中,if 语句(strncmp(buf, "END", sizeof(buf)) 始终是 false(并且不安全),导致服务器永远不会从 while(read)循环。

由于 while 循环继续,服务器将在没有可用数据时尝试另一个 read,导致服务器阻塞,直到客户端发送更多数据。

空终止符需要紧跟在读取的数据之后,而不仅仅是缓冲区的末尾。否则,当您尝试将缓冲区视为字符串时,之前的内容将成为字符串的一部分。

buf[num_read] = '[=10=]';

并且为了防止这个写到缓冲区外,你需要在调用read()时从缓冲区大小中减去1:

while ((num_rd = read(cfd, buf, sizeof(buf)-1)) > 0) {

服务器随后阻塞,因为它无法识别使其跳出读取循环的 END 消息,并且没有更多内容可供其读取。它会阻塞,直到有数据要读取或 EOF。当客户端关闭连接时,它会得到 EOF,但客户端不会这样做,直到服务器发送结果。

但是,我认为你的整个设计可能注定要失败。 TCP 是一种流协议,它没有消息边界。客户端中对 write() 的每次调用不一定会在服务器中产生单个 read() ,连续的写入可以(并且通常会)连接起来。你需要重新设计你的协议来处理这个问题。

大部分straight-forward是用一个poll().

更新:
我认为其他答案中写的东西的真正原因。你想在你有流的地方管道。
但是
你可以试试这个

SOCKET q = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
bool c = 1;
setsockopt(q, IPPROTO_TCP, TCP_NODELAY, (char *)&c, sizeof(c));

终于

#include <stdio.h>
#include <stddef.h>
#include <sys/socket.h>    //socket
#include <netdb.h>
#include <unistd.h>
#include <stdbool.h>
#include <stdlib.h>
#include <netinet/tcp.h>
#include <string.h>
#include <poll.h>

int main(int argc , char *argv[])
{
    struct sockaddr_in addr = { AF_INET , htons( 8888 ) /* btc port */ , htonl(INADDR_LOOPBACK) };;
    int down_flag = 0;
    int result = 0;
    int ret = 0;
    bool c = !false;
    enum { buf_size = 1025 };
    char buf[buf_size] = {0};
    int bc = 1024;    

    int data_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    setsockopt(data_socket, SOL_SOCKET, SO_KEEPALIVE, (char *)&c, sizeof(c));
    setsockopt(data_socket, IPPROTO_TCP, TCP_NODELAY, (char *)&c, sizeof(c));
    setsockopt(data_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&c, sizeof(c));
    setsockopt(data_socket, SOL_SOCKET, SO_REUSEPORT, (char *)&c, sizeof(c));
    setsockopt(data_socket, SOL_SOCKET, SO_SNDBUF, (int *)&bc, sizeof(bc));
    setsockopt(data_socket, SOL_SOCKET, SO_RCVBUF, (int *)&bc, sizeof(bc));
    ret = connect(data_socket, (const struct sockaddr *) &addr, sizeof(addr));
    if (-1 == ret) {
        perror("Connect error");
        exit(EXIT_FAILURE);
    }
    setsockopt(data_socket, IPPROTO_TCP, TCP_NODELAY, (char *)&c, sizeof(c));    /* Send arguments */

    struct pollfd pfd[1];
    int nready;
    pfd[0].fd = data_socket;
    pfd[0].events = POLLOUT;

    for (int k = 1; k < argc; k++)
    {
        nready = poll(pfd, 1, 15 * 1000);
        // if((pfd[0].revents & (POLLOUT|POLLHUP))) printf("tray \n" );
        memset(buf, 0, sizeof(buf));
        strcpy(buf, argv[k]);
        ret = write(data_socket, buf , 99);
        if (-1 == ret) {
            perror("Write error");
            break;
        }
    }

    memset(buf, 0, sizeof(buf));
    strcpy(buf, "END");
    nready = poll(pfd, 1, 15 * 1000);
    ret = write(data_socket, buf, 99);
    printf("write %s to socket, ret = %d\n", buf, ret);
    if (-1 == ret) {
        perror("Write to socket error");
        exit(EXIT_FAILURE);
    }
    /* Read the result */
    memset(buf, 0, sizeof(buf));
    pfd[0].events = POLLIN;
    nready = poll(pfd, 1, 15 * 1000);
    ret = read(data_socket, buf, 99);
    if (-1 == ret) {
        perror("Read from client socket error");
        exit(EXIT_FAILURE);
    }
    buf[sizeof(buf) - 1] = '[=11=]';
    printf("Result = %10s\n", buf);
    close(data_socket);
    exit(EXIT_SUCCESS);
}

#include <stdio.h>
#include <stddef.h>
#include <sys/socket.h>    //socket
#include <netdb.h>
#include <unistd.h>
#include <stdbool.h>
#include <stdlib.h>
#include <netinet/tcp.h>
#include <string.h>
#include <poll.h>


int main(int argc , char *argv[])
{
    struct sockaddr_in addr = { AF_INET , htons( 8888 ) /* btc port */ , htonl(INADDR_LOOPBACK) };;
    int down_flag = 0;
    int result = 0;
    int ret = 0;
    bool c = !false;
    int bc = 1024;    

    int sfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (sfd < 0) {
        perror("Create server socket error: %s\n");
        return 0;
    }
    setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (char *)&c, sizeof(c));
    setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (char *)&c, sizeof(c));
    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (char *)&c, sizeof(c));
    setsockopt(sfd, SOL_SOCKET, SO_REUSEPORT, (char *)&c, sizeof(c));
    setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (int *)&bc, sizeof(bc));
    setsockopt(sfd, SOL_SOCKET, SO_RCVBUF, (int *)&bc, sizeof(bc));
    /* Bind socket to loopback address */
    if (bind(sfd, (struct sockaddr *) &addr, sizeof(struct sockaddr_in)) == -1) {
        perror("Bind server socket failed");
        goto _exit;
    }
    setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (char *)&c, sizeof(c));

    if (listen(sfd, 128) == -1) {
        perror("Listen failed");
        goto _exit;
    }

    ssize_t num_rd = 0;
    enum { buf_size = 1025 };
    char buf[buf_size] = {0};

    struct pollfd pfd[1] = {{0}};
    int nready;

    for (;;)
    {
        printf("Waiting to accept a connection...\n");
        int cfd = accept(sfd, NULL, NULL);
        printf("Accepted socket fd = %d\n", cfd);
        result = 0;
        pfd[0].fd = cfd;
//      pfd[0].fd = sfd;
        pfd[0].events = POLLIN;
        while (!false) {
            memset(buf, 0, sizeof(buf));
            nready = poll(pfd, 1, 15 * 1000);
            num_rd = read(cfd, buf, 99);
            if (num_rd <= 0) break;
            buf[sizeof(buf) - 1] = '[=12=]';
            printf("Read data: %s\n", buf);

            /* Handle commands */
            if (!strncmp(buf, "DOWN", strlen(buf))) {
                down_flag = 1;
                break;
            }
            if (!strncmp(buf, "END", strlen(buf))) {
                break;
            }
            int temp = 0;
            int f = sscanf(buf, "%d", &temp);
            if (f != 1)
            {
                printf("and then \n" );
                return (0);
            }
            result = result + temp;
        }
        if (-1 == num_rd) {
            perror("Read error");
        }

        memset(buf, 0, sizeof(buf));
        sprintf(buf, "%d", result);
        pfd[0].events = POLLOUT;
        nready = poll(pfd, 1, 15 * 1000);
        ret = write(cfd, buf, 99);
        if (-1 == ret) {
            perror("Write error\n");
            goto _exit;
        }
        close(cfd);
        /* Quit on DOWN command */
        if (down_flag) {
            break;
        }
    }
_exit:
    close(sfd);
    return 0;
}

那里的所有东西也是一种尺寸。一次写入后 read_buffer 应该已满,以便其他人等待读取完成

运行:
./serv
for i in {1..100}; do ./client 3 4 5 6; done

也在 github:
https://github.com/alexeyneu/BlockZero/tree/master/onemore

编辑:
使用 1kb/command 看起来它会阻止一些大的传输。 SO_XXX 缓冲区的最小值