通过 TCP 进行 PvP 通信的问题

Issues with PvP communication via TCP

由于需要通过 Internet 连接在两个进程之间实现通信,因此我刚刚开始使用 TCP(以及所有相关库)。我的代码可以工作,但与我(可能是由于缺乏经验)预期的网络延迟和带宽相比,它非常慢。另外,我确信代码还有很多其他问题,它使用的是 UNIX 套接字 API。除非有充分的理由,否则我宁愿不为我的项目使用大型库(例如 Boost)。

我包括一个最小的工作示例。尽管我尽了最大努力缩短它,但它还是相当长。但是,我认为大部分问题应该在第一个文件(tcp_helpers.h)中,该文件仅由客户端和服务器主程序以相当明显的方式使用。那里的功能没有完全优化,但我很难相信这是问题所在,而是 可能是逻辑中的一些基本缺陷

我也想请教一些与问题相关的问题

  1. 对于网络性能,我应该担心使用 IPv4 还是 IPv6?会不会是我的网络不喜欢使用 IPv4 并降低了性能?
  2. 由于 Socket API 模拟了一个流,我认为无论是对较小的数据块多次调用 send() 还是对大数据块调用一次都没有关系。但也许它确实很重要,并且用较小的块来做(我每次都调用发送我的自定义协议头和数据)会导致问题?
  3. 假设两方在发送下一条消息之前通过网络通信对接收到的数据进行处理(如我的示例中所做的那样)。如果这两个进程在本地主机上花费了 x 时间来完成,那么它们在真实网络上的时间应该永远不会超过 (2*x + (network overhead)),对吗?如果 x 很小,使计算(即在发送下一条消息之前工作)更快也无济于事,对吗?
  4. 我的示例程序在本地主机上 运行 时大约需要 4 毫秒,而在我正在使用的本地(大学)网络上 运行 时 >0.7 秒。本地网络的 ping 时间(用 ping 测量)为( min/avg/max/mdev [ms] = 4.36 / 97.6 / 405. / 86.3 )和带宽(用 iperf 测量)~70Mbit /秒。当在网络上运行示例程序时,我得到(通过 wireshark 在相关端口上进行过滤测量)190 个数据包,平均吞吐量为 172kB/s,平均数据包大小约为 726 字节。这现实吗?对我来说,考虑到这些网络参数,我的程序似乎应该快得多,尽管 ping 时间相当长。
  5. 查看示例程序生成的实际网络流量,我开始思考在幕后完成的 TCP 的所有“功能”。我在某处读到许多程序同时使用多个套接字“以提高速度”。这在这里有帮助吗,例如使用两个套接字,每个套接字仅用于单向通信?特别是,也许以某种方式减少 ack 数据包的数量可以提高性能?
  6. 我将 messages/headers 写成结构的方式有(至少)两个我已经知道的大问题。首先,我不强制执行网络字节顺序。如果一个通信方使用big-endian而另一个使用little-endian,这个程序将无法运行。此外,由于结构填充(请参阅 catb.org/esr/structure-packing/),结构的大小可能因实现或编译器而异,这也会破坏我的程序。我可以向结构中添加类似 (for gcc) __attribute__((__packed__)) 的内容,但这会使它非常特定于编译器,甚至可能导致效率低下。是否有处理此问题的标准方法(我已经看到有关手动对齐的内容)? (也许我找错关键词了。)
// tcp_helpers.h. // NOTE: Using this code is very ill-advised.
#include <iostream>
#include <string>
#include <sstream>
#include <vector>
#include <unistd.h>  // POSIX specific
#include <sys/socket.h> // POSIX specific
#include <netinet/in.h> // POSIX specific
#include <arpa/inet.h> // POSIX specific
#include <cerrno>  // for checking socket error messages
#include <cstdint> // for fixed length integer types

//////////////////// PROFILING ///////////////////
#include <chrono>
static auto start = std::chrono::high_resolution_clock::now();
void print_now(const std::string &message) {
    auto t2 = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> time_span = t2 - start;
    std::cout << time_span.count() << ": " << message << std::endl;
}
//////////////////// PROFILING ///////////////////

struct TCPMessageHeader {
    uint8_t protocol_name[4];
    uint32_t message_bytes;
};

struct ServerSends {
    uint16_t a;
    uint32_t b;
    uint32_t c;
};

typedef uint8_t ClientSends;

namespace TCP_Helpers {
    template<typename NakedStruct>
    void send_full_message(int fd, TCPMessageHeader header_to_send, const std::vector<NakedStruct> &structs_to_send) {
        print_now("Begin send_full_message");
        if (header_to_send.message_bytes != sizeof(NakedStruct) * structs_to_send.size()) {
            throw std::runtime_error("Struct vector's size does not match the size claimed by message header");
        }
        int bytes_to_send = sizeof(header_to_send);
        int send_retval;
        while (bytes_to_send != 0) {
            send_retval = send(fd, &header_to_send, sizeof(header_to_send), 0);
            if (send_retval == -1) {
                int errsv = errno;  // from errno.h
                std::stringstream s;
                s << "Sending data failed (locally). Errno:" << errsv << " while sending header.";
                throw std::runtime_error("Sending data failed (locally)");
            }
            bytes_to_send -= send_retval;
        }
        bytes_to_send = header_to_send.message_bytes;
        while (bytes_to_send != 0) {
            send_retval = send(fd, &structs_to_send[0], sizeof(NakedStruct) * structs_to_send.size(), 0);
            if (send_retval == -1) {
                int errsv = errno;  // from errno.h
                std::stringstream s;
                s << "Sending data failed (locally). Errno:" << errsv <<
                  " while sending data of size " << header_to_send.message_bytes << ".";
                throw std::runtime_error(s.str());
            }
            bytes_to_send -= send_retval;
        }
        print_now("end send_full_message.");
    }

    template<typename NakedStruct>
    std::vector<NakedStruct> receive_structs(int fd, uint32_t bytes_to_read) {
        print_now("Begin receive_structs");
        unsigned long num_structs_to_read;
        // ensure expected message is non-zero length and a multiple of the SingleBlockParityRequest struct
        if (bytes_to_read > 0 && bytes_to_read % sizeof(NakedStruct) == 0) {
            num_structs_to_read = bytes_to_read / sizeof(NakedStruct);
        } else {
            std::stringstream s;
            s << "Message length (bytes_to_read = " << bytes_to_read <<
              " ) specified in header does not divide into required stuct size (" << sizeof(NakedStruct) << ").";
            throw std::runtime_error(s.str());
        }
        // vector must have size > 0 for the following pointer arithmetic to work 
        // (this method must check this in above code).
        std::vector<NakedStruct> received_data(num_structs_to_read);
        int valread;
        while (bytes_to_read > 0)  // todo need to include some sort of timeout?!
        {
            valread = read(fd,
                           ((uint8_t *) (&received_data[0])) +
                           (num_structs_to_read * sizeof(NakedStruct) - bytes_to_read),
                           bytes_to_read);
            if (valread == -1) {
                throw std::runtime_error("Reading from socket file descriptor failed");
            } else {
                bytes_to_read -= valread;
            }
        }
        print_now("End receive_structs");
        return received_data;
    }

    void send_header(int fd, TCPMessageHeader header_to_send) {
        print_now("Start send_header");
        int bytes_to_send = sizeof(header_to_send);
        int send_retval;
        while (bytes_to_send != 0) {
            send_retval = send(fd, &header_to_send, sizeof(header_to_send), 0);
            if (send_retval == -1) {
                int errsv = errno;  // from errno.h
                std::stringstream s;
                s << "Sending data failed (locally). Errno:" << errsv << " while sending (lone) header.";
                throw std::runtime_error(s.str());
            }
            bytes_to_send -= send_retval;
        }
        print_now("End send_header");
    }

    TCPMessageHeader receive_header(int fd) {
        print_now("Start receive_header (calls receive_structs)");
        TCPMessageHeader retval = receive_structs<TCPMessageHeader>(fd, sizeof(TCPMessageHeader)).at(0);
        print_now("End receive_header (calls receive_structs)");
        return retval;
    }
}

// main_server.cpp
#include "tcp_helpers.h"

int init_server(int port) {
    int server_fd;
    int new_socket;
    struct sockaddr_in address{};
    int opt = 1;
    int addrlen = sizeof(address);
    // Creating socket file descriptor
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        throw std::runtime_error("socket creation failed\n");
    }

    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        throw std::runtime_error("failed to set socket options");
    }
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(port);
    // Forcefully attaching socket to the port
    if (bind(server_fd, (struct sockaddr *) &address, sizeof(address)) < 0) {
        throw std::runtime_error("bind failed");
    }
    if (listen(server_fd, 3) < 0) {
        throw std::runtime_error("listen failed");
    }
    if ((new_socket = accept(server_fd, (struct sockaddr *) &address, (socklen_t *) &addrlen)) < 0) {
        throw std::runtime_error("accept failed");
    }
    if (close(server_fd)) // don't need to listen for any more tcp connections (PvP connection).
        throw std::runtime_error("closing server socket failed");
    return new_socket;
}

int main() {
    int port = 20000;
    int socket_fd = init_server(port);
    while (true) {
        TCPMessageHeader rcv_header = TCP_Helpers::receive_header(socket_fd);
        if (rcv_header.protocol_name[0] == 0)   // using first byte of header name as signal to end
            break;
        // receive message
        auto rcv_message = TCP_Helpers::receive_structs<ClientSends>(socket_fd, rcv_header.message_bytes);
        for (ClientSends ex : rcv_message) // example "use" of the received data that takes a bit of time.
            std::cout <<  static_cast<int>(ex) << " ";
        std::cout << std::endl << std::endl;

        // send a "response" containing 1000 structs of zeros
        auto bunch_of_zeros = std::vector<ServerSends>(500);
        TCPMessageHeader send_header{"abc", 500 * sizeof(ServerSends)};
        TCP_Helpers::send_full_message(socket_fd, send_header, bunch_of_zeros);

    }
    exit(EXIT_SUCCESS);
}
// main_client.cpp
#include "tcp_helpers.h"

int init_client(const std::string &ip_address, int port) {
    int sock_fd;
    struct sockaddr_in serv_addr{};

    if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        throw std::runtime_error("TCP Socket creation failed\n");
    }
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(port);
    // Convert IPv4 address from text to binary form
    if (inet_pton(AF_INET, ip_address.c_str(), &serv_addr.sin_addr) <= 0) {
        throw std::runtime_error("Invalid address/ Address not supported for TCP connection\n");
    }
    if (connect(sock_fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
        throw std::runtime_error("Failed to connect to server.\n");
    }
    return sock_fd;
}

int main() {
    // establish connection to server and get socket file descriptor.
    int port = 20000;
    int socket_fd = init_client("127.0.0.1", port);
    for (int i = 0; i < 20; ++i) {  // repeat sending and receiving random data
        // send a message containing 200 structs of zeros
        auto bunch_of_zeros = std::vector<ClientSends>(250);
        TCPMessageHeader send_header{"abc", 250 * sizeof(ClientSends)};
        TCP_Helpers::send_full_message(socket_fd, send_header, bunch_of_zeros);

        // receive response
        TCPMessageHeader rcv_header = TCP_Helpers::receive_header(socket_fd);
        auto rcv_message = TCP_Helpers::receive_structs<ServerSends>(socket_fd, rcv_header.message_bytes);
        for (ServerSends ex : rcv_message) // example "use" of the received data that takes a bit of time.
            std::cout << ex.a << ex.b << ex.c << " ";
        std::cout << std::endl << std::endl;
    }
    auto end_header = TCPMessageHeader{}; // initialized all fields to zero. (First byte of name == 0) is "end" signal.
    TCP_Helpers::send_header(socket_fd, end_header);
    exit(EXIT_SUCCESS);
}

我怀疑的第一件事是 Nagle's algorithm;如果它在您的 TCP 套接字上启用(默认情况下是启用的),那么它可以为 send() 调用增加多达 200 毫秒的延迟。如果它已启用,请尝试将其禁用(如下面的代码所示)并查看是否能让您更快。

// Disable Nagle's algorithm for TCP socket (s)
const int enableNoDelay = 1;
if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &enableNoDelay, sizeof(enableNoDelay)) != 0) 
{
   perror("setsockopt");
}

For network performance, should I worry about using IPv4 vs IPv6? Could it be that my network dislikes the use of IPv4 somehow and penalized performance?

就性能而言,IPv4 和 IPv6 相似;它们的区别更多地在于易于配置方面;使用更适合您的用例的那个;两者都不会明显快于或慢于另一个。 (为了获得最大的灵活性,我建议同时支持两者;这在任何双栈 OS 下都可以轻松完成,方法是编写您的程序以使用 IPv6,然后启用 IPv4-mapped IPv6-addresses 以便您的 IPv6 套接字也可以通过 IPv4 进行通信)

Since the Socket API emulates a stream, I would think it does not matter if you call send() multiple times on smaller chunks of data or once on a big chunk. But perhaps it does matter and doing it with smaller chunks (I call send for my custom protocol header and the data separately each time) leads to issues?

Nagle 算法是否启用并不重要; Nagle 的算法实际上用于在通过网络发送数据之前将尽可能多的数据收集到一个数据包中(类似于机场的停车班车有时会等待几分钟以收集更多乘客,然后再开往停车场).这提高了效率,因为较大的数据包比较小的数据包具有更好的有效负载与开销之比,但代价是增加了延迟。关闭 Nagle 算法将防止延迟发生,这意味着您的数据更有可能立即传出到网络,但也更有可能许多传出数据包非常小。如果你想成为最优的,你可以manage动态启用和禁用Nagle的算法,这样你就可以同时获得更大数据包的改进效率立即的低延迟发送数据包。

Suppose that two parties communicate over a network doing work on the received data before sending their next message (as is done in my example). If the two processes take x amount of time on localhost to finish, they should never take longer than (2*x + (network overhead)) on the real network, right? If x is small, making the computations (i.e. work before sending next message) go faster will not help, right?

TCP 不是实时协议;特别是它优先考虑正确的传输而不是有限的传输时间。这意味着原则上任何 TCP 传输都可以花费任意多的时间来完成,因为直到数据到达接收程序才完成工作,并且如果网络正在丢弃数据包,TCP 堆栈将必须保持重新发送他们,直到他们最终到达那里。您可以通过在一台计算机和另一台计算机之间设置 TCP 数据传输然后在传输过程中拔出以太网电缆几秒钟来自行测试 - 请注意,当电缆断开时传输“暂停”,然后恢复(缓慢启动并再次加速),在重新连接电缆后没有任何数据丢失。

也就是说,这听起来像是 Amdahl's Law 的情况,它(广泛解释)表示加速已经很快的操作的一部分不会降低整个序列的速度;因为序列的慢部分保持不变并且仍然代表花费的大部分时间。这听起来像你的例子中的情况。

My example program takes about 4ms when running on localhost and >0.7 seconds when running on the local (university) network I'm using. The local network has ping times (measured with ping) of ( min/avg/max/mdev [ms] = 4.36 / 97.6 / 405. / 86.3 ) and a bandwidth (measured with iperf) of ~70Mbit/s. When running the example program on the network I get (measured with wireshark filtering on the port in question) 190 packets with an average throughput of 172kB/s and average packet size ~726 Bytes. Is this realistic?

这对我来说不是最理想的;如果你可以 运行 另一个程序(例如 iperf 或 scp 或其他)使用 TCP 以 70Mbit/sec 的速度传输数据,那么你自己的程序没有理由不能在相同的地方做同样的事情硬件,一旦它被正确编写并消除了瓶颈。但是您通常不会从天真编写的程序中获得最佳性能;首先需要对瓶颈是什么以及如何消除它们进行一些调整和了解。

To me it seems like my program should be much faster given these network parameters, despite the fairly high ping time.

请记住,如果程序 A 向程序 B 发送数据,然后等待程序 B 响应,则需要一次完整的网络往返,在最佳情况下,这将是网络 ping 时间的两倍。如果两侧都启用 Nagle 的算法,它最终可能会比这长 400 毫秒。

Looking at the actual network traffic generated by the example program, I started thinking about all the "features" of TCP that are done under the hood. I read somewhere that many programs use several sockets at the same time "to gain speed". Could this help here, for example using two sockets, each for just one-way communication? In particular, maybe somehow reducing the number of ack packets could help performance?

不是,不是。无论您设置多少(或多少)TCP 连接,所有数据都必须通过相同的物理硬件;所以拥有多个 TCP 连接只是将相同大小的馅饼分成更小的部分。唯一可能有用的情况是,如果您希望能够无序地传递消息(例如,在传输时将高优先级命令消息异步发送到您的批量),因为单个 TCP 连接总是严格地传递数据先进先出顺序,而TCP连接B中的数据通常可以先行并立即发送,即使TCP连接A中存在大量流量积压。

在您对 TCP 有更多经验之前,我不会尝试实现它;使用单个 TCP 连接可以实现高带宽和低延迟,因此在尝试更精细的操作之前先对其进行优化。

还请记住,如果您正在进行双向通信并使用阻塞 I/O 调用来执行此操作,那么每当程序在 recv() 内阻塞时,它必须等到某些在 recv() 调用 return 之前已收到数据,在此期间它无法调用 send() 以向网络提供更多传出数据。类似地,任何时候程序在 send() 内被阻塞(等待套接字的传出数据缓冲区耗尽足以容纳来自 send() 调用的数据),程序被阻塞并且不能'在 send() return 之前不要做任何事情;特别是在这段时间内它不能调用 recv() 来接收传入的数据。这种半双工行为会显着限制数据吞吐量;解决方法包括使用非阻塞 I/O 调用而不是阻塞 I/O,或使用多线程,或使用异步 I/O 调用(任何这些选项都需要对程序进行重大重新设计,虽然)。

Are there standard ways of dealing with [endian-ness and alignment/packing issues] (I've seen something about aligning manually)? (Maybe I'm looking for the wrong keywords.)

有标准的(或者至少是公开可用的)方法来处理这些问题;您要的关键字是“数据序列化”;即,将数据对象转换为定义明确的字节序列(以便您可以通过网络发送字节),然后进行“数据反序列化”(接收程序将该字节序列转换回数据对象)与发件人发送的相同)。这些步骤不是火箭科学,但要 100% 正确可能有点棘手,因此您可以查看准备好的解决方案,例如 Google 的 Protocol Buffers library to handle the tedious parts for you. But if you're really keen to do it all yourself, have a look at this question 及其答案,以获取一些示例你可能会做到这一点。

您关心延迟,所以首先要做的是始终确保禁用 Nagle 算法,TCP_NODELAY。另一个答案显示了如何。

Nagle 的算法以延迟为代价明确地优化了吞吐量,当你想要相反的时候。

I also want to ask some questions relevant to the problem:

我希望你不会 - 这使这个问题成为一个无法完全回答的问题。

  1. For network performance, should I worry about using IPv4 vs IPv6? Could it be that my network dislikes the use of IPv4 somehow and penalized performance?

没有明显的原因它应该重要,如果有的话,v4 堆栈可能会得到更好的优化,因为它仍然(在撰写本文时)被更频繁地使用。

不过,如果您想进行测试,您已经在使用 iperf - 因此请自行比较网络上的 v4 和 v6 性能。如果您不理解结果,请单独提问。

  1. Since the Socket API emulates a stream, I would think it does not matter if you call send() multiple times on smaller chunks of data or once on a big chunk. But perhaps it does matter and doing it with smaller chunks (I call send for my custom protocol header and the data separately each time) leads to issues?

当然它有所作为。

首先,考虑网络堆栈需要以某种方式决定如何将该流分成数据包。使用 Nagle 的算法,这是通过等待计时器(或下一个确认,这也是它与客户端的延迟确认计时器交互的原因)来完成的。使用 TCP_NODELAY,每次调用 send() 通常都会产生自己的数据包。

由于数据包 headers,在更多数据包中发送相同数量的用户数据会占用更多网络带宽。默认情况下,延迟和吞吐量效率之间的权衡由 Nagle 算法和延迟确认计时器处理。如果您禁用 Nagle 算法,您可以手动控制权衡,这样您就可以做最适合您的程序的事情 - 但它一个权衡,需要一些思考和努力。

其次,对send()的调用本身并不是免费的。系统调用比 user-space 库调用更昂贵。

  1. Suppose that two parties communicate over a network doing work on the received data before sending their next message (as is done in my example). If the two processes take x amount of time on localhost to finish, they should never take longer than (2*x + (network overhead)) on the real network, right? If x is small, making the computations (i.e. work before sending next message) go faster will not help, right?

您的估计看起来合理,但是 - 时间就是时间。 仅仅因为网络支配总延迟,并不意味着本地计算的加速没有效果。

如果您使计算速度加快 1ns,即使网络延迟为 10ms,整体速度仍然快 1ns。您对网络延迟的直接控制也较少,因此可能需要尽可能节省时间。

  1. ... To me it seems like my program should be much faster given these network parameters, despite the fairly high ping time.

是的,它应该 - 使用 TCP_NODELAY 和正确的 send() 调用次数重试。

  1. ... Could this help here, for example using two sockets, each for just one-way communication? In particular, maybe somehow reducing the number of ack packets could help performance?

由于延迟确认计时器,对称 two-way 通信的确认基本上是免费的。您的 Wireshark 调查应该已经显示了这一点。对于 one-way 流,它们 不是 免费的,因此使用两个 half-duplex 套接字 更糟

  1. The way I'm writing messages/headers as structs has (at least) two big problems that I already know. First, I do not enforce network byte order. If one communicating party uses big-endian and the other little-endian, this program will not work. Furthermore, due to struct padding (see [catb.org/esr/structure-packing/][1]), the sizes of the structs may vary between implementations or compilers, which would also break my program. I could add something like (for gcc) __attribute__((__packed__)) to the structs but that would make it very compiler specific and perhaps even lead to inefficiency. Are there standard ways of dealing with this issue (I've seen something about aligning manually)? (Maybe I'm looking for the wrong keywords.)

处理这些问题的标准方法太多了,没有什么比得上单一标准了。

  • Endianness - 最简单的方法是获取当前主机的本机字节顺序,并使用它。如果您以不同的顺序连接主机,那将需要做额外的工作,但它很可能永远不会发生,您可以推迟额外的工作。

  • 填充:

    使用__attribute__((packed))#pragma pack当然会导致一些低效率,但很方便。请注意,不需要指向未对齐字段的指针和引用即可正常工作,因此这些结构并不是真正的 general-purpose.

    手动填充 do-able 但乏味。您只需要弄清楚本机 laid-out 结构中每个字段的实际对齐方式,然后插入填充字节,这样其他实现就不会以不同方式布局。您可以使用 alignas 说明符以更好的方式实现相同的目的。

    一种免费获得大部分对齐方式的简单方法是始终从最大到最小排列字段(包括大小和对齐方式,但它们通常是相关的)。

  • 通常,序列化 是将本机数据转换为有线格式(以及反序列化)的名称。这涵盖了从转换数据 to/from JSON 字符串以实现非常广泛的兼容性到发送精确的 laid-out 二进制数据的整个范围。您的延迟限制使您处于最后一端。