在并行环境中使用 fork、exec 和管道时出现死锁

Deadlock when using fork, exec and pipes in a parallel environment

我正在使用 fork 和 exec 生成一个子进程。使用两个管道提供输入并从该进程接收输出。

它在大多数时间都工作得很好,但是当我使用类似 openmp 的东西来测试它在并发环境中的表现时,它在 read 系统调用中挂起,或者有时 waitpid

当我 strace 编辑子进程时,我发现它也在 read 系统调用上被阻止。这很奇怪,因为我只在提供了所有输入并关闭管道的写入端后才等待父进程中的读取。

我尝试创建一个 MVCE,但它有点长。我不知道如何让它更短。为了简单起见,我删除了大部分错误检查代码。

请注意,我的代码中没有全局变量。而且我没有尝试从多个线程中的相同文件描述符 read/write。

我想不出会出什么问题。所以希望你们能发现我做错了什么。

有:

#include <string.h>
#include <assert.h>
#include <unistd.h>
#include <limits.h>
#include <sys/wait.h>
#include <stdio.h>
#include <stdlib.h>

size_t
min(size_t first, size_t second)
{
    if(first < second)
    {
        return first;
    }

    return second;
}

struct RDI_Buffer
{
    char* data;
    size_t size;
};

typedef struct RDI_Buffer RDI_Buffer;

RDI_Buffer
rdi_buffer_init()
{
    RDI_Buffer b = {0};
    return b;
}

RDI_Buffer
rdi_buffer_new(size_t size)
{
    RDI_Buffer b;

    b.data = malloc(size);
    b.size = size;
    return b;
}

void
rdi_buffer_free(RDI_Buffer b)
{
    if(!b.data)
    {
        return;
    }

    free(b.data);
}

RDI_Buffer
rdi_buffer_resize(RDI_Buffer b, size_t new_size)
{
    if(!b.data)
    {
        return rdi_buffer_new(new_size);
    }

    char* new_data = realloc(b.data, new_size);

    if(new_data)
    {
        b.size = new_size;
        b.data = new_data;
        return b;
    }

    RDI_Buffer output = rdi_buffer_new(new_size);
    memcpy(output.data, b.data, output.size);
    rdi_buffer_free(b);
    return output;
}

RDI_Buffer
rdi_buffer_null_terminate(RDI_Buffer b)
{
    b = rdi_buffer_resize(b, b.size + 1);
    b.data[b.size - 1] = '[=10=]';
    return b;
}

static RDI_Buffer
rw_from_fd(int w_fd, int r_fd, RDI_Buffer input)
{
    const size_t CHUNK_SIZE = 4096;

    assert(input.size <= CHUNK_SIZE);

    write(w_fd, input.data, input.size);
    close(w_fd);

    RDI_Buffer output = rdi_buffer_new(CHUNK_SIZE);

    read(r_fd, output.data, CHUNK_SIZE);

    close(r_fd);
    return output;
}

int main()
{
#pragma omp parallel for
    for(size_t i = 0; i < 100; i++)
    {
        char* thing =
                "Hello this is a sort of long text so that we can test how "
                "well this works. It should go with cat and be printed.";

        RDI_Buffer input_buffer;
        input_buffer.data = thing;
        input_buffer.size = strlen(thing);

        int main_to_sub[2];
        int sub_to_main[2];

        pipe(main_to_sub);
        pipe(sub_to_main);

        int pid = fork();

        if(pid == 0)
        {
            dup2(main_to_sub[0], STDIN_FILENO);
            dup2(sub_to_main[1], STDOUT_FILENO);

            close(main_to_sub[1]);
            close(main_to_sub[0]);
            close(sub_to_main[1]);
            close(sub_to_main[0]);

            char* argv[] = {"cat", NULL};

            execvp("cat", argv);
            exit(1);
        }

        close(main_to_sub[0]);
        close(sub_to_main[1]);

        RDI_Buffer output =
                rw_from_fd(main_to_sub[1], sub_to_main[0], input_buffer);

        int *status = NULL;
        waitpid(pid, status, 0);

        if(status)
        {
            printf("%d\n", *status);
        }

        output = rdi_buffer_null_terminate(output);

        if(strcmp(output.data, thing) == 0)
        {
            printf("good\n");
        }
        else
        {
            printf("bad\n");
        }

        rdi_buffer_free(output);
    }
}

确保编译 link 和 -fopenmp。像这样:gcc main.c -fopenmp

正在将评论转化为答案。

您可能运行正在用尽文件描述符。对于并行性,如果限制为大约 256 个描述符,则在每次迭代中创建 4 个文件描述符的循环的 100 次迭代可能 运行 会遇到麻烦。是的,你很快就关闭了其中一些,但速度够快吗?这还不清楚。调度的不确定性很容易解释不同的行为。

The way I understand openmp is that it goes into the loop body n times at a time where n is the number of threads (am I wrong?). So at any single time I should never have more than n*2 file descriptors which on my machine should be around 24.

大概是n*4个文件描述符,但是并行度可能有限制。我对 OpenMP 不够熟悉,无法对此发表权威评论。除了应该设置的 for 循环之外,还有其他编译指示吗?我不清楚 运行ning 显示的代码在使用 Clang 编译代码时在 Mac 上引入了并行性——与 GCC 9.1 不同,它不会抱怨 #pragma .0 确实在我的默认编译选项下警告未知编译指示。

然而,有了 forks 和 execs 以及线程,生活变得棘手。应该关闭的文件描述符可能没有被关闭,因为文件描述符是 process-level 资源,因此线程 1 可能会创建线程 2 不知道但共享的文件描述符。然后当线程 2 分叉时,线程 1 创建的文件描述符不会关闭,从而阻止 cat 正确检测 EOF,等等

验证这一点的一种方法是使用这样的函数:

#include <sys/stat.h>

static void dump_descriptors(int max_fd)
{
    struct stat sb;
    for (int fd = 0; fd <= max_fd; fd++)
        putchar((fstat(fd, &sb) == 0) ? 'o' : '-');
    putchar('\n');
    fflush(stdout);
}

并在子代码中,用一个合适的数字调用它(也许是 64 — 可能有使用大到 404 的数字的情况)。虽然在函数中尝试使用 flockfile(stdout)funlockfile(stdout) 很诱人,但是如果只在子进程中调用它是没有意义的,因为子进程是 single-threaded 因此不会有来自进程中其他线程的任何干扰。但是,不同的进程可能会干扰彼此的输出,这是非常可行的。

如果您要从父进程线程使用 dump_descriptor(),则在循环之前添加 flockfile(stdout);,在 fflush() 调用之后添加 funlockfile(stdout);。我不确定这会在多大程度上干扰问题;它通过该函数强制执行单线程,因为 none 的其他线程可以写入 stdout 而一个线程已将其锁定。

然而,当我使用稍微修改过的代码版本测试它时,它在 'good' 和 'bad' 行之前以及 dump_descriptors() 输出之前输出 PID,我从来没有看到任何操作的交错。我得到如下输出:

14128: ooooooo----------------------------------------------------------
14128: good
14129: ooooooo----------------------------------------------------------
14129: good
14130: ooooooo----------------------------------------------------------
14130: good
…
14225: ooooooo----------------------------------------------------------
14225: good
14226: ooooooo----------------------------------------------------------
14226: good
14227: ooooooo----------------------------------------------------------
14227: good

这强烈表明代码中没有并行性。当没有并行性时,您将看不到问题。每次都有 4 个管道描述符,代码小心地关闭它们。

考虑在您的场景中将描述符映射重定向到一个文件(或每个子文件一个文件),您实际上可能会获得严重的并行性。

请注意,将线程与 fork() 混合使用本身就很困难(因为 John Bollinger )——您通常使用一种或另一种机制,而不是同时使用两种机制。

当您的 main 挂起时,在单独的会话中键入 lsof。我想你会看到类似的东西:

....
cat       5323                 steve  txt       REG              252,0    52080    6553613 /bin/cat
cat       5323                 steve  mem       REG              252,0  1868984   17302005 /lib/x86_64-linux-gnu/libc-2.23.so
cat       5323                 steve  mem       REG              252,0   162632   17301981 /lib/x86_64-linux-gnu/ld-2.23.so
cat       5323                 steve  mem       REG              252,0  1668976   12849924 /usr/lib/locale/locale-archive
cat       5323                 steve    0r     FIFO               0,10      0t0      32079 pipe
cat       5323                 steve    1w     FIFO               0,10      0t0      32080 pipe
cat       5323                 steve    2u      CHR              136,0      0t0          3 /dev/pts/0
cat       5323                 steve    3r     FIFO               0,10      0t0      32889 pipe
cat       5323                 steve    4w     FIFO               0,10      0t0      32889 pipe
cat       5323                 steve    6r     FIFO               0,10      0t0      32890 pipe
cat       5323                 steve    7r     FIFO               0,10      0t0      34359 pipe
cat       5323                 steve    8w     FIFO               0,10      0t0      32890 pipe
cat       5323                 steve   10r     FIFO               0,10      0t0      22504 pipe
cat       5323                 steve   15w     FIFO               0,10      0t0      22504 pipe
cat       5323                 steve   16r     FIFO               0,10      0t0      22505 pipe
cat       5323                 steve   31w     FIFO               0,10      0t0      22505 pipe
cat       5323                 steve   35r     FIFO               0,10      0t0      17257 pipe
cat       5323                 steve   47r     FIFO               0,10      0t0      31304 pipe
cat       5323                 steve   49r     FIFO               0,10      0t0      30264 pipe

这就提出了一个问题,这些管道是从哪里来的?您的主循环不再是单个循环,而是一组不同步的并行循环。看看下面的样板:

void *tdispatch(void *p) {
      int to[2], from[2];
      pipe(to);
      pipe(from);
      if (fork() == 0) {
          ...
      } else {
          ...
          pthread_exit(0); 
     }
}
...
for (int i = 0; i < NCPU; i++) {
    pthread_create(..., tdispatch, ...);
}
for (int i = 0; i < NCPU; i++) {
    pthread_join(...);
}

tdispatch 的多个实例可以交错 pipe(to)、pipe(from) 和 fork() 调用;因此 fds 正在泄漏到这些分叉的进程中。我说泄漏是因为分叉的进程不知道它们在那里。

当管道有缓冲数据或至少有一个写入文件描述符对其打开时,它会继续响应 read() 系统调用。

假设进程5打开了两个管道的正常两端,分别指向管道#10和管道#11;进程 6 有管​​道#12 和管道#13。但是,由于上面的泄漏,进程 5 也有管道#12 的写端,进程 6 有管​​道#10 的写端。进程 5 和 6 永远不会退出,因为它们保持彼此 read-pipe 的打开状态。

解决方案与前面的人所说的差不多:线程和分叉是一个棘手的组合。您必须序列化管道、叉子、initial-close 位才能使其工作。

正如 Jonathan Leffler 和 Mevet 在他们的回答中所解释的那样,问题的原因原来是继承给子进程的打开文件。如果你有那个问题,请阅读他们的答案,如果你仍然不明白或不知道该怎么做,请参考我的答案。

我将以一种我马上就能理解的方式分享我的解释。同时分享我的代码解决方案。

考虑以下场景: 进程 A 打开一个管道(这是两个文件)。

进程 A 生成进程 B 以通过管道与其通信。但是,它还会创建继承管道(两个文件)的进程 C。

现在进程 B 将在管道上连续调用 read(2),这是一个阻塞系统调用。 (它会等到有人写入管道)

进程 A 完成写入并关闭管道的末端。通常这会导致进程 B 中的 read(2) 系统调用失败并且程序将退出(这就是我们想要的)。

但是在我们的例子中,由于进程 C 确实有一个开放的管道写入端,进程 B 中的 read(2) 系统调用不会失败,并且会阻塞等待来自开放 write-end 的写入在进程 C.

进程C刚结束就没事了。

真正的死锁会出现在 B 和 C 彼此持有管道的不同场景中(如 Mevet 的回答中所述)。他们每个人都在等待另一个人关闭他们的管道末端。这永远不会导致死锁。

我的解决方案是在 fork(2)

之后关闭所有我不需要的打开文件
int pid = fork();

if(pid == 0)
{
    int exceptions[2] = {main_to_sub[0], sub_to_main[1]};
    close_all_descriptors(exceptions);
    dup2(main_to_sub[0], STDIN_FILENO);
    dup2(sub_to_main[1], STDOUT_FILENO);

    close(main_to_sub[0]);
    close(sub_to_main[1]);

    char* argv[] = {"cat", NULL};

    execvp("cat", argv);
    exit(1);
}

下面是close_all_descriptors

的实现
#include <fcntl.h>
#include <errno.h>

static int
is_within(int fd, int arr[2])
{
    for(int i = 0; i < 2; i++)
    {
        if(fd == arr[i])
        {
            return 1;
        }
    }

    return 0;
}

static int
fd_is_valid(int fd)
{
    return fcntl(fd, F_GETFD) != -1 || errno != EBADF;
}

static void
close_all_descriptors(int exceptions[2])
{
    // getdtablesize returns the max number of files that can be open. It's 1024 on my system
    const int max_fd = getdtablesize();

    // starting at 3 because I don't want to close stdin/out/err
    // let dup2(2) do that
    for (int fd = 3; fd <= max_fd; fd++)
    {
        if(fd_is_valid(fd) && !is_within(fd, exceptions))
        {
            close(fd);
        }
    }
}