如何运行 shell-liked pipe tasks with limited fork number?

How to run shell-liked pipe tasks with limited fork number?

我有一个简单的程序,我想模拟我没有足够的fork容量的情况,所以我在做管道任务时限制了fork数量。

让用 C++ 编写的 shell-liked 管道作业:

ls | cat | cat | cat | cat | cat | cat | cat | cat

我有代码,运行 pipe()fork():

#include <errno.h>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <sys/wait.h>
#include <unistd.h>

const int fork_limit = 3;
int fork_counter = 0;

static void sig_chld_handler(int signo) {
  int status;
  pid_t pid;
  while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
    printf("received SIGCHLD from child process %d\n", pid);
    fork_counter -= 1;
    fprintf(stdout, "counter --, %d\n", fork_counter);
  }
}

int main(int argc, char **argv) {

  signal(SIGCHLD, sig_chld_handler);

  char **cmds[9];

  char *p1_args[] = {"ls", NULL};
  char *p2_args[] = {"cat", NULL};

  cmds[0] = p1_args;
  cmds[1] = p2_args;
  cmds[2] = p2_args;
  cmds[3] = p2_args;
  cmds[4] = p2_args;
  cmds[5] = p2_args;
  cmds[6] = p2_args;
  cmds[7] = p2_args;
  cmds[8] = p2_args;


  int pipes[16];
  pipe(pipes);     // sets up 1st pipe
  pipe(pipes + 2); // sets up 2nd pipe
  pipe(pipes + 4);
  pipe(pipes + 6);
  pipe(pipes + 8);
  pipe(pipes + 10);
  pipe(pipes + 12);
  pipe(pipes + 14);


  pid_t pid;

  for (int i = 0; i < 9; i++) {

    // === comment this part to run correctly ===
    while (fork_limit < fork_counter) {
      usleep(10000);
    }
    // ===

    pid = fork();
    if (pid == 0) {
      fprintf(stdout, "fork p%d\n", i);

      // read
      if (i != 0) {
        if (dup2(pipes[(i - 1) * 2], 0) < 0) {
          fprintf(stderr, "dup2 error\n");
          exit(EXIT_FAILURE);
        }
      }

      // write
      if (i != 8) {
        if (dup2(pipes[i * 2 + 1], 1) < 0) {
          fprintf(stderr, "dup2 error\n");
          exit(EXIT_FAILURE);
        }
      }

      for (int j = 0; j < 16; j++) {
        close(pipes[j]);
      }

      execvp(*cmds[i], cmds[i]);
    } else {
      fork_counter += 1;
      fprintf(stdout, "counter ++, %d \n", fork_counter);
    }
  }

  for (int j = 0; j < 16; j++) {
    close(pipes[j]);
  }

  waitpid(pid, NULL, 0); // wait the last one.

  std::cout << "Parent done." << std::endl;
}

while (fork_limit < fork_counter)是我限制的child号。 如果我删除 while 块,代码 运行 很好,但如果我添加它,它会挂起。

我假设之前的children会死,这样fork_counter -= 1,新的child可以分叉,但是行为不是和我想不通为什么。


没有 while.

的结果
counter ++, 1 
counter ++, 2 
fork p0
fork p1
counter ++, 3 
fork p2
counter ++, 4 
counter ++, 5 
fork p3
fork p4
counter ++, 6 
fork p5
counter ++, 7 
counter ++, 8 
fork p6
fork p7
counter ++, 9 
fork p8
received SIGCHLD from child process 13316
counter --, 8
Applications
Desktop
Documents
Downloads
Library
Movies
Music
Pictures
received SIGCHLD from child process 13319
counter --, 7
received SIGCHLD from child process 13318
counter --, 6
received SIGCHLD from child process 13317
counter --, 5
received SIGCHLD from child process 13320
counter --, 4
received SIGCHLD from child process 13322
counter --, 3
received SIGCHLD from child process 13321
counter --, 2
received SIGCHLD from child process 13323
counter --, 1
received SIGCHLD from child process 13324
counter --, 0
Parent done.

while的结果,这意味着我限制了分叉数量。

counter ++, 1 
counter ++, 2 
fork p0
fork p1
counter ++, 3 
counter ++, 4 
fork p2
fork p3
received SIGCHLD from child process 13291
counter --, 3
counter ++, 4 
fork p4

(hang)

main 程序(按顺序)执行以下操作:

  1. Pre-create所有管道
  2. Fork children 使用管道(每个 children 关闭所有继承的管道)
  3. 关闭所有管道

问题出在 'close all pipes' 的时间上。因为 main 正在等待第一个 children 完成 (while (fork_limit < fork_counter)),然后才能完成步骤 #2。

然而,cat children(例如,第一个 cat)在其输入管道被所有进程关闭之前无法完成,包括他 main,这是在等待他们完成。实际上是一个僵局。

考虑对 main 进程进行小的修改,一旦 children 分叉,这将关闭每个 children 的管道:

if ( fork() ) {
   // Children
   ...

} else {
   // Main - close pipes ASAP.
      close(pipes[(i-1)*2]) ;
      close(pipes[(i-1)*2+1]);
      fork_counter += 1;
      fprintf(stdout, "counter ++, %d \n", fork_counter);
}

可能还需要对 children 中的管道关闭进行一些修改。

感谢@dash-o 的回答

它的工作原理是:

#include <errno.h>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <sys/wait.h>
#include <unistd.h>

const int fork_limit = 4;
int fork_counter = 0;

static void sig_chld_handler(int signo) {
  int status;
  pid_t pid;
  while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
    printf("received SIGCHLD from child process %d\n", pid);
    fork_counter -= 1;
    fprintf(stdout, "counter --, %d\n", fork_counter);
  }
}

int main(int argc, char **argv) {

  signal(SIGCHLD, sig_chld_handler);

  char **cmds[9];

  char *p1_args[] = {"ls", NULL};
  char *p2_args[] = {"cat", NULL};

  cmds[0] = p1_args;
  cmds[1] = p2_args;
  cmds[2] = p2_args;
  cmds[3] = p2_args;
  cmds[4] = p2_args;
  cmds[5] = p2_args;
  cmds[6] = p2_args;
  cmds[7] = p2_args;
  cmds[8] = p2_args;

  int pipes[16];
  pipe(pipes);     // sets up 1st pipe
  pipe(pipes + 2); // sets up 2nd pipe
  pipe(pipes + 4);
  pipe(pipes + 6);
  pipe(pipes + 8);
  pipe(pipes + 10);
  pipe(pipes + 12);
  pipe(pipes + 14);

  pid_t pid;

  for (int i = 0; i < 9; i++) {

    while (fork_limit < fork_counter) {
      usleep(10000);
    }

    pid = fork();
    if (pid == 0) {
      fprintf(stdout, "fork p%d\n", i);

      // read
      if (i != 0) {
        if (dup2(pipes[(i - 1) * 2], 0) < 0) {
          fprintf(stderr, "dup2 error\n");
          exit(EXIT_FAILURE);
        }
      }

      // write
      if (i != 8) {
        if (dup2(pipes[i * 2 + 1], 1) < 0) {
          fprintf(stderr, "dup2 error\n");
          exit(EXIT_FAILURE);
        }
      }

      for (int j = 0; j < 16; j++) {
        close(pipes[j]);
      }

      execvp(*cmds[i], cmds[i]);
    } else {

      if (i != 0) {
        close(pipes[(i - 1) * 2]);
        close(pipes[(i - 1) * 2 + 1]);
      }

      fork_counter += 1;
      fprintf(stdout, "counter ++, %d \n", fork_counter);
    }
  }

  for (int j = 0; j < 16; j++) {
    close(pipes[j]);
  }

  waitpid(pid, NULL, 0); // wait the last one.

  std::cout << "Parent done." << std::endl;
}