如何通过管道发送不同数量的数据并测量它们之间的执行时间?

How to send different amounts of data through pipes and measure the execution time between them?

我目前正在尝试通过 Linux 管道在父进程和子进程之间发送大量不同数量的数据。

我发送的数据量是:1 KB、10 KB、100 KB、1 MB、10 MB 和 100 MB。

问题在于,尽管数据包越来越大,但每种情况下的执行时间都非常相似,而实际上它们应该在增加。

代码如下:

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <sys/time.h>
#include <time.h>

char* generateData(int kbNum) {
    int bytes = 1024 * kbNum;
    char* data = malloc(bytes);

    for (int i = 0; i<bytes; i++) data[i] = '*';

    return data;
}

void errorMessage(const char *message){
    printf("%s",message);
    exit(1);
}

void printTimes(long int* times){
    int fileSize = 1;
    bool bigger = false;

    for (int i = 0; i < 6; i++){
        if (bigger) printf("El tiempo para %dMB fue de %ld μs usando tuberias\n", fileSize, times[i]);
        else printf("El tiempo para %dKB fue de %ld μs usando tuberias\n", fileSize, times[i]);

        if (fileSize < 100) fileSize = fileSize * 10;
        else {
            fileSize = 1;
            bigger = true;
        }
    }
}

void checkErrors(int processId, int pipesPointers[2]){
    for (int i=0; i<2; i++){
        if (pipesPointers[i] < 0){
            // Generating the corresponding error message for pipe error
            char message[] = "Error starting the pipe ";
            strcat(message, (char*) &i);
            errorMessage(message);
        }
    }

    // Generating the corresponding error message for the fork
    if (processId < 0) errorMessage("Error generating the fork");
}

void childrenProcess(int pipeWrite, int pipeRead){
    int check = 1;

    for (int i = 0; i < 100001; i = i * 10){
        char* data = malloc(1024 * i);

        // Get Data Package
        read(pipeRead, &data, sizeof(data));

        // Sending check
        write(pipeWrite, &check, sizeof(int));
    }
}

void parentProcess(int pipeWrite, int pipeRead){
    struct timeval start, stop;
    long int times[6];
    int index = 0;

    for (int i = 1; i < 100001; i = i * 10){
        char* data = generateData(i);
        gettimeofday(&start, NULL);
        int dataCheck;

        // Sending Data Package to consumer
        write(pipeWrite, &data, sizeof(data));

        // Getting the check confirmation 
        read(pipeRead, &dataCheck, sizeof(dataCheck));

        // Get the time elapsed time
        gettimeofday(&stop, NULL);

        times[index] = (stop.tv_sec - start.tv_sec) * 1000000 + stop.tv_usec - start.tv_usec;
        index++;
        free(data);
    }

    printTimes(times);
}

void startProgram(){
    int pipePointers[2], parentPipes[2], childrenPipes[2];
    pid_t processId = 0;

    // Creating emparented processes and pipes
    pipePointers[0] = pipe(parentPipes);
    pipePointers[1] = pipe(childrenPipes);
    processId = fork();
    checkErrors(processId, pipePointers);

    // Spliting the code that witch process will execute
    if (processId == 0){
        close(childrenPipes[0]);
        close(parentPipes[1]);
        childrenProcess(childrenPipes[1], parentPipes[0]);
        close(childrenPipes[1]);
        close(parentPipes[0]);
    }
    else if (processId > 0){
        close(childrenPipes[1]);
        close(parentPipes[0]);
        parentProcess(parentPipes[1], childrenPipes[0]);
        close(childrenPipes[0]);
        close(parentPipes[1]);
    }
}

int main(int argc, char const *argv[]){
    startProgram();
    return 0;
}

这里是输出时间:

The elapsed time for 1KB was 41 μs using pipes.
The elapsed time for 10KB was 88 μs using pipes.
The elapsed time for 100KB was 74 μs using pipes.
The elapsed time for 1MB was 79 μs using pipes.
The elapsed time for 10MB was 34 μs using pipes.
The elapsed time for 100MB was 24 μs using pipes.

非常感谢您的帮助,我会认真听取任何意见或答复。

对于其他一切,祝你有美好的一天!

在child过程中,您有:

read(pipeRead, &data, sizeof(data));

你在哪里char *data = …。同样在 parent 和 write() 中。这意味着您正在向 child 写入 8 个字节(假设 64 位 CPU)并且大小没有改变,所以时间也没有改变。

要修复,您需要计算要在变量中发送的数据的大小,并在对 read()write() 的调用中使用它。您还应该将 data 而不是 &data 传递给 read()write().

还有其他问题 — 其中一个较大的问题是 child for (int i = 0; i < 100001; i = i * 10){ 中的循环。零乘以十仍然是零,所以循环 运行s 很长时间。另一个问题是您需要读取所有写入的数据,但是管道缓冲区的大小是有限的(在现代系统上通常为 64 KiB;在旧系统上以前为 5 KiB)。我使用函数 multi_read()multi_write() 解决了这个问题。我还使错误消息代码更加灵活。我让一些名字更一致。它以这样的代码结束:

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <sys/time.h>
#include <time.h>
#include <stdarg.h>

static void errorMessage(const char *message, ...)
{
    va_list args;
    va_start(args, message);
    vfprintf(stderr, message, args);
    va_end(args);
    exit(1);
}

static char *generateData(int kbNum)
{
    int bytes = 1024 * kbNum;
    char *data = malloc(bytes);

    if (data == NULL)
        errorMessage("failed to allocate %d bytes of memory\n", bytes);

    memset(data, '*', bytes);

    return data;
}

static void printTimes(long int *times)
{
    int fileSize = 1;
    bool bigger = false;

    for (int i = 0; i < 6; i++)
    {
        if (bigger)
            printf("El tiempo para %dMB fue de %ld μs usando tuberias\n", fileSize, times[i]);
        else
            printf("El tiempo para %dKB fue de %ld μs usando tuberias\n", fileSize, times[i]);

        if (fileSize < 100)
            fileSize = fileSize * 10;
        else
        {
            fileSize = 1;
            bigger = true;
        }
    }
}

static void checkErrors(int processId, int pipesPointers[2])
{
    for (int i = 0; i < 2; i++)
    {
        if (pipesPointers[i] < 0)
        {
            // Generating the corresponding error message for pipe error
            char message[] = "Error starting the pipe ";
            strcat(message, (char *)&i);
            errorMessage(message);
        }
    }

    // Generating the corresponding error message for the fork
    if (processId < 0)
        errorMessage("Error generating the fork");
}

static ssize_t multi_read(int fd, char *buffer, size_t nbytes)
{
    ssize_t nb = 0;
    size_t nleft = nbytes;
    ssize_t tbytes = 0;
    while (nleft > 0 && (nb = read(fd, buffer, nleft)) > 0)
    {
        tbytes += nb;
        buffer += nb;
        nleft  -= nb;
    }
    if (tbytes == 0)
        tbytes = nb;
    return tbytes;
}

static ssize_t multi_write(int fd, const char *buffer, size_t nbytes)
{
    ssize_t nb = 0;
    size_t nleft = nbytes;
    ssize_t tbytes = 0;
    while (nleft > 0 && (nb = write(fd, buffer, nleft)) > 0)
    {
        tbytes += nb;
        buffer += nb;
        nleft  -= nb;
    }
    if (tbytes == 0)
        tbytes = nb;
    return tbytes;
}

static void childProcess(int pipeWrite, int pipeRead)
{
    int check = 1;

    for (int i = 1; i < 100001; i = i * 10)     // i = 1 is key!
    {
        size_t size = 1024 * i;
        char *data = malloc(size);

        if (data == NULL)
            errorMessage("failed to allocate memory\n");

        printf("Child reading %zu bytes of data\n", size);

        // Get Data Package
        if (multi_read(pipeRead, data, size) != (ssize_t)size)
            errorMessage("Read error in %s()\n", __func__);

        // Sending check
        if (write(pipeWrite, &check, sizeof(check)) != sizeof(check))
            errorMessage("Write error in %s()\n", __func__);
    }
}

static void parentProcess(int pipeWrite, int pipeRead)
{
    struct timeval start, stop;
    long int times[6];
    int index = 0;

    for (int i = 1; i < 100001; i = i * 10)
    {
        size_t size = 1024 * i;
        char *data = generateData(i);
        printf("Parent writing %zu bytes of data\n", size);
        gettimeofday(&start, NULL);
        int check;

        // Sending Data Package to consumer
        if (multi_write(pipeWrite, data, size) != (ssize_t)size)
            errorMessage("Write error in %s()\n", __func__);

        // Getting the check confirmation
        if (read(pipeRead, &check, sizeof(check)) != sizeof(check))
            errorMessage("Read error in %s()\n", __func__);

        // Get the time elapsed time
        gettimeofday(&stop, NULL);

        times[index] = (stop.tv_sec - start.tv_sec) * 1000000 + stop.tv_usec - start.tv_usec;
        index++;
        free(data);
    }

    printTimes(times);
}

static void startProgram(void)
{
    int pipePointers[2], parentPipes[2], childrenPipes[2];
    pid_t processId = 0;

    // Creating emparented processes and pipes
    pipePointers[0] = pipe(parentPipes);
    pipePointers[1] = pipe(childrenPipes);
    processId = fork();
    checkErrors(processId, pipePointers);

    // Spliting the code that witch process will execute
    if (processId == 0)
    {
        close(childrenPipes[0]);
        close(parentPipes[1]);
        childProcess(childrenPipes[1], parentPipes[0]);
        close(childrenPipes[1]);
        close(parentPipes[0]);
    }
    else if (processId > 0)
    {
        close(childrenPipes[1]);
        close(parentPipes[0]);
        parentProcess(parentPipes[1], childrenPipes[0]);
        close(childrenPipes[0]);
        close(parentPipes[1]);
    }
}

int main(void)
{
    startProgram();
    return 0;
}

样本 运行 给出:

Parent writing 1024 bytes of data
Child reading 1024 bytes of data
Child reading 10240 bytes of data
Parent writing 10240 bytes of data
Child reading 102400 bytes of data
Parent writing 102400 bytes of data
Child reading 1024000 bytes of data
Parent writing 1024000 bytes of data
Child reading 10240000 bytes of data
Parent writing 10240000 bytes of data
Child reading 102400000 bytes of data
Parent writing 102400000 bytes of data
El tiempo para 1KB fue de 312 μs usando tuberias
El tiempo para 10KB fue de 269 μs usando tuberias
El tiempo para 100KB fue de 324 μs usando tuberias
El tiempo para 1MB fue de 1042 μs usando tuberias
El tiempo para 10MB fue de 8136 μs usando tuberias
El tiempo para 100MB fue de 77713 μs usando tuberias