如何运行 shell-liked pipe tasks with limited fork number?
How to run shell-liked pipe tasks with limited fork number?
我有一个简单的程序,我想模拟我没有足够的fork容量的情况,所以我在做管道任务时限制了fork数量。
让用 C++ 编写的 shell-liked 管道作业:
ls | cat | cat | cat | cat | cat | cat | cat | cat
我有代码,运行 pipe()
和 fork()
:
#include <errno.h>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <sys/wait.h>
#include <unistd.h>
const int fork_limit = 3;
int fork_counter = 0;
static void sig_chld_handler(int signo) {
int status;
pid_t pid;
while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
printf("received SIGCHLD from child process %d\n", pid);
fork_counter -= 1;
fprintf(stdout, "counter --, %d\n", fork_counter);
}
}
int main(int argc, char **argv) {
signal(SIGCHLD, sig_chld_handler);
char **cmds[9];
char *p1_args[] = {"ls", NULL};
char *p2_args[] = {"cat", NULL};
cmds[0] = p1_args;
cmds[1] = p2_args;
cmds[2] = p2_args;
cmds[3] = p2_args;
cmds[4] = p2_args;
cmds[5] = p2_args;
cmds[6] = p2_args;
cmds[7] = p2_args;
cmds[8] = p2_args;
int pipes[16];
pipe(pipes); // sets up 1st pipe
pipe(pipes + 2); // sets up 2nd pipe
pipe(pipes + 4);
pipe(pipes + 6);
pipe(pipes + 8);
pipe(pipes + 10);
pipe(pipes + 12);
pipe(pipes + 14);
pid_t pid;
for (int i = 0; i < 9; i++) {
// === comment this part to run correctly ===
while (fork_limit < fork_counter) {
usleep(10000);
}
// ===
pid = fork();
if (pid == 0) {
fprintf(stdout, "fork p%d\n", i);
// read
if (i != 0) {
if (dup2(pipes[(i - 1) * 2], 0) < 0) {
fprintf(stderr, "dup2 error\n");
exit(EXIT_FAILURE);
}
}
// write
if (i != 8) {
if (dup2(pipes[i * 2 + 1], 1) < 0) {
fprintf(stderr, "dup2 error\n");
exit(EXIT_FAILURE);
}
}
for (int j = 0; j < 16; j++) {
close(pipes[j]);
}
execvp(*cmds[i], cmds[i]);
} else {
fork_counter += 1;
fprintf(stdout, "counter ++, %d \n", fork_counter);
}
}
for (int j = 0; j < 16; j++) {
close(pipes[j]);
}
waitpid(pid, NULL, 0); // wait the last one.
std::cout << "Parent done." << std::endl;
}
行while (fork_limit < fork_counter)
是我限制的child号。
如果我删除 while 块,代码 运行 很好,但如果我添加它,它会挂起。
我假设之前的children会死,这样fork_counter -= 1
,新的child可以分叉,但是行为不是和我想不通为什么。
没有 while
.
的结果
counter ++, 1
counter ++, 2
fork p0
fork p1
counter ++, 3
fork p2
counter ++, 4
counter ++, 5
fork p3
fork p4
counter ++, 6
fork p5
counter ++, 7
counter ++, 8
fork p6
fork p7
counter ++, 9
fork p8
received SIGCHLD from child process 13316
counter --, 8
Applications
Desktop
Documents
Downloads
Library
Movies
Music
Pictures
received SIGCHLD from child process 13319
counter --, 7
received SIGCHLD from child process 13318
counter --, 6
received SIGCHLD from child process 13317
counter --, 5
received SIGCHLD from child process 13320
counter --, 4
received SIGCHLD from child process 13322
counter --, 3
received SIGCHLD from child process 13321
counter --, 2
received SIGCHLD from child process 13323
counter --, 1
received SIGCHLD from child process 13324
counter --, 0
Parent done.
while
的结果,这意味着我限制了分叉数量。
counter ++, 1
counter ++, 2
fork p0
fork p1
counter ++, 3
counter ++, 4
fork p2
fork p3
received SIGCHLD from child process 13291
counter --, 3
counter ++, 4
fork p4
(hang)
main
程序(按顺序)执行以下操作:
- Pre-create所有管道
- Fork children 使用管道(每个 children 关闭所有继承的管道)
- 关闭所有管道
问题出在 'close all pipes' 的时间上。因为 main
正在等待第一个 children 完成 (while (fork_limit < fork_counter)
),然后才能完成步骤 #2。
然而,cat
children(例如,第一个 cat
)在其输入管道被所有进程关闭之前无法完成,包括他 main
,这是在等待他们完成。实际上是一个僵局。
考虑对 main
进程进行小的修改,一旦 children 分叉,这将关闭每个 children 的管道:
if ( fork() ) {
// Children
...
} else {
// Main - close pipes ASAP.
close(pipes[(i-1)*2]) ;
close(pipes[(i-1)*2+1]);
fork_counter += 1;
fprintf(stdout, "counter ++, %d \n", fork_counter);
}
可能还需要对 children 中的管道关闭进行一些修改。
感谢@dash-o 的回答
它的工作原理是:
#include <errno.h>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <sys/wait.h>
#include <unistd.h>
const int fork_limit = 4;
int fork_counter = 0;
static void sig_chld_handler(int signo) {
int status;
pid_t pid;
while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
printf("received SIGCHLD from child process %d\n", pid);
fork_counter -= 1;
fprintf(stdout, "counter --, %d\n", fork_counter);
}
}
int main(int argc, char **argv) {
signal(SIGCHLD, sig_chld_handler);
char **cmds[9];
char *p1_args[] = {"ls", NULL};
char *p2_args[] = {"cat", NULL};
cmds[0] = p1_args;
cmds[1] = p2_args;
cmds[2] = p2_args;
cmds[3] = p2_args;
cmds[4] = p2_args;
cmds[5] = p2_args;
cmds[6] = p2_args;
cmds[7] = p2_args;
cmds[8] = p2_args;
int pipes[16];
pipe(pipes); // sets up 1st pipe
pipe(pipes + 2); // sets up 2nd pipe
pipe(pipes + 4);
pipe(pipes + 6);
pipe(pipes + 8);
pipe(pipes + 10);
pipe(pipes + 12);
pipe(pipes + 14);
pid_t pid;
for (int i = 0; i < 9; i++) {
while (fork_limit < fork_counter) {
usleep(10000);
}
pid = fork();
if (pid == 0) {
fprintf(stdout, "fork p%d\n", i);
// read
if (i != 0) {
if (dup2(pipes[(i - 1) * 2], 0) < 0) {
fprintf(stderr, "dup2 error\n");
exit(EXIT_FAILURE);
}
}
// write
if (i != 8) {
if (dup2(pipes[i * 2 + 1], 1) < 0) {
fprintf(stderr, "dup2 error\n");
exit(EXIT_FAILURE);
}
}
for (int j = 0; j < 16; j++) {
close(pipes[j]);
}
execvp(*cmds[i], cmds[i]);
} else {
if (i != 0) {
close(pipes[(i - 1) * 2]);
close(pipes[(i - 1) * 2 + 1]);
}
fork_counter += 1;
fprintf(stdout, "counter ++, %d \n", fork_counter);
}
}
for (int j = 0; j < 16; j++) {
close(pipes[j]);
}
waitpid(pid, NULL, 0); // wait the last one.
std::cout << "Parent done." << std::endl;
}
我有一个简单的程序,我想模拟我没有足够的fork容量的情况,所以我在做管道任务时限制了fork数量。
让用 C++ 编写的 shell-liked 管道作业:
ls | cat | cat | cat | cat | cat | cat | cat | cat
我有代码,运行 pipe()
和 fork()
:
#include <errno.h>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <sys/wait.h>
#include <unistd.h>
const int fork_limit = 3;
int fork_counter = 0;
static void sig_chld_handler(int signo) {
int status;
pid_t pid;
while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
printf("received SIGCHLD from child process %d\n", pid);
fork_counter -= 1;
fprintf(stdout, "counter --, %d\n", fork_counter);
}
}
int main(int argc, char **argv) {
signal(SIGCHLD, sig_chld_handler);
char **cmds[9];
char *p1_args[] = {"ls", NULL};
char *p2_args[] = {"cat", NULL};
cmds[0] = p1_args;
cmds[1] = p2_args;
cmds[2] = p2_args;
cmds[3] = p2_args;
cmds[4] = p2_args;
cmds[5] = p2_args;
cmds[6] = p2_args;
cmds[7] = p2_args;
cmds[8] = p2_args;
int pipes[16];
pipe(pipes); // sets up 1st pipe
pipe(pipes + 2); // sets up 2nd pipe
pipe(pipes + 4);
pipe(pipes + 6);
pipe(pipes + 8);
pipe(pipes + 10);
pipe(pipes + 12);
pipe(pipes + 14);
pid_t pid;
for (int i = 0; i < 9; i++) {
// === comment this part to run correctly ===
while (fork_limit < fork_counter) {
usleep(10000);
}
// ===
pid = fork();
if (pid == 0) {
fprintf(stdout, "fork p%d\n", i);
// read
if (i != 0) {
if (dup2(pipes[(i - 1) * 2], 0) < 0) {
fprintf(stderr, "dup2 error\n");
exit(EXIT_FAILURE);
}
}
// write
if (i != 8) {
if (dup2(pipes[i * 2 + 1], 1) < 0) {
fprintf(stderr, "dup2 error\n");
exit(EXIT_FAILURE);
}
}
for (int j = 0; j < 16; j++) {
close(pipes[j]);
}
execvp(*cmds[i], cmds[i]);
} else {
fork_counter += 1;
fprintf(stdout, "counter ++, %d \n", fork_counter);
}
}
for (int j = 0; j < 16; j++) {
close(pipes[j]);
}
waitpid(pid, NULL, 0); // wait the last one.
std::cout << "Parent done." << std::endl;
}
行while (fork_limit < fork_counter)
是我限制的child号。
如果我删除 while 块,代码 运行 很好,但如果我添加它,它会挂起。
我假设之前的children会死,这样fork_counter -= 1
,新的child可以分叉,但是行为不是和我想不通为什么。
没有 while
.
counter ++, 1
counter ++, 2
fork p0
fork p1
counter ++, 3
fork p2
counter ++, 4
counter ++, 5
fork p3
fork p4
counter ++, 6
fork p5
counter ++, 7
counter ++, 8
fork p6
fork p7
counter ++, 9
fork p8
received SIGCHLD from child process 13316
counter --, 8
Applications
Desktop
Documents
Downloads
Library
Movies
Music
Pictures
received SIGCHLD from child process 13319
counter --, 7
received SIGCHLD from child process 13318
counter --, 6
received SIGCHLD from child process 13317
counter --, 5
received SIGCHLD from child process 13320
counter --, 4
received SIGCHLD from child process 13322
counter --, 3
received SIGCHLD from child process 13321
counter --, 2
received SIGCHLD from child process 13323
counter --, 1
received SIGCHLD from child process 13324
counter --, 0
Parent done.
while
的结果,这意味着我限制了分叉数量。
counter ++, 1
counter ++, 2
fork p0
fork p1
counter ++, 3
counter ++, 4
fork p2
fork p3
received SIGCHLD from child process 13291
counter --, 3
counter ++, 4
fork p4
(hang)
main
程序(按顺序)执行以下操作:
- Pre-create所有管道
- Fork children 使用管道(每个 children 关闭所有继承的管道)
- 关闭所有管道
问题出在 'close all pipes' 的时间上。因为 main
正在等待第一个 children 完成 (while (fork_limit < fork_counter)
),然后才能完成步骤 #2。
然而,cat
children(例如,第一个 cat
)在其输入管道被所有进程关闭之前无法完成,包括他 main
,这是在等待他们完成。实际上是一个僵局。
考虑对 main
进程进行小的修改,一旦 children 分叉,这将关闭每个 children 的管道:
if ( fork() ) {
// Children
...
} else {
// Main - close pipes ASAP.
close(pipes[(i-1)*2]) ;
close(pipes[(i-1)*2+1]);
fork_counter += 1;
fprintf(stdout, "counter ++, %d \n", fork_counter);
}
可能还需要对 children 中的管道关闭进行一些修改。
感谢@dash-o 的回答
它的工作原理是:
#include <errno.h>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <sys/wait.h>
#include <unistd.h>
const int fork_limit = 4;
int fork_counter = 0;
static void sig_chld_handler(int signo) {
int status;
pid_t pid;
while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
printf("received SIGCHLD from child process %d\n", pid);
fork_counter -= 1;
fprintf(stdout, "counter --, %d\n", fork_counter);
}
}
int main(int argc, char **argv) {
signal(SIGCHLD, sig_chld_handler);
char **cmds[9];
char *p1_args[] = {"ls", NULL};
char *p2_args[] = {"cat", NULL};
cmds[0] = p1_args;
cmds[1] = p2_args;
cmds[2] = p2_args;
cmds[3] = p2_args;
cmds[4] = p2_args;
cmds[5] = p2_args;
cmds[6] = p2_args;
cmds[7] = p2_args;
cmds[8] = p2_args;
int pipes[16];
pipe(pipes); // sets up 1st pipe
pipe(pipes + 2); // sets up 2nd pipe
pipe(pipes + 4);
pipe(pipes + 6);
pipe(pipes + 8);
pipe(pipes + 10);
pipe(pipes + 12);
pipe(pipes + 14);
pid_t pid;
for (int i = 0; i < 9; i++) {
while (fork_limit < fork_counter) {
usleep(10000);
}
pid = fork();
if (pid == 0) {
fprintf(stdout, "fork p%d\n", i);
// read
if (i != 0) {
if (dup2(pipes[(i - 1) * 2], 0) < 0) {
fprintf(stderr, "dup2 error\n");
exit(EXIT_FAILURE);
}
}
// write
if (i != 8) {
if (dup2(pipes[i * 2 + 1], 1) < 0) {
fprintf(stderr, "dup2 error\n");
exit(EXIT_FAILURE);
}
}
for (int j = 0; j < 16; j++) {
close(pipes[j]);
}
execvp(*cmds[i], cmds[i]);
} else {
if (i != 0) {
close(pipes[(i - 1) * 2]);
close(pipes[(i - 1) * 2 + 1]);
}
fork_counter += 1;
fprintf(stdout, "counter ++, %d \n", fork_counter);
}
}
for (int j = 0; j < 16; j++) {
close(pipes[j]);
}
waitpid(pid, NULL, 0); // wait the last one.
std::cout << "Parent done." << std::endl;
}