python - 运行 多个子进程并行但允许在 bash 命令失败时重试?
python - run multiple subprocesses in parallel but allow to retry when a bash command is failed?
我有一个 python 脚本,用于 运行 多个 bash 命令(不同子进程中的每个命令)并且子进程的最大值是固定的。
代码如下,但是我不知道如何为失败的命令添加重试(返回状态代码!= 0)。
因此,例如,如果命令失败,则应将其添加回循环,并且在失败 3 次后,不应再添加此命令。
import os
import subprocess
NUMBER_OF_PROCESSES = 3
def run_bash_commands_in_parallel(commands):
"""
Run a list of bash commands in parallel with maximum number of processes
"""
processes = set()
max_processes = NUMBER_OF_PROCESSES
i = 1
for command in commands:
print("# Processing task {} / {}".format(i, len(commands)))
processes.add(subprocess.Popen(command, shell=True))
if len(processes) >= max_processes:
os.wait()
processes.difference_update(
[p for p in processes if p.poll() is not None])
i += 1
# How to rery a bash command up to 3 times?
# Check if all the child processes were closed
for p in processes:
if p.poll() is None:
p.wait()
commands = ['gdalwarp aaaa', 'gdalwarp bbbb', 'gdalwarp ccc']
run_bash_commands_in_parallel(commands)
您可以使用 2 个队列解决此问题,一个用于等待作业,一个用于当前 运行。
我们不只是附加命令,而是将命令和重试计数存储在 waiting
队列中,将 Popen
对象存储在 running
队列中。
然后我们检查所有 运行 命令的当前状态。成功完成的命令被删除,运行 命令放回 运行 队列,如果作业失败,我们增加尝试计数器并将其放回等待队列,但前提是尝试次数确实如此不超过 max_tries 参数。
我们一直这样做,直到现在更多的工作在任一队列中:
import subprocess
import time
from collections import deque
def run_bash_commands_in_parallel(commands, max_tries, n_parallel):
"""
Run a list of bash commands in parallel with maximum number of processes
"""
# we use a tuple of (command, tries) to store the information
# how often a command was already tried.
waiting = deque([(command, 1) for command in commands])
running = deque()
while len(waiting) > 0 or len(running) > 0:
print(f'Running: {len(running)}, Waiting: {len(waiting)}')
# if less than n_parallel jobs are running and we have waiting jobs,
# start new jobs
while len(waiting) > 0 and len(running) < n_parallel:
command, tries = waiting.popleft()
try:
running.append((subprocess.Popen(command), command, tries))
print(f"Started task {command}")
except OSError:
print(f'Failed to start command {command}')
# poll running commands
for _ in range(len(running)):
process, command, tries = running.popleft()
ret = process.poll()
if ret is None:
running.append((process, command, tries))
# retry errored jobs
elif ret != 0:
if tries < max_tries:
waiting.append((command, tries + 1))
else:
print(f'Command: {command} errored after {max_tries} tries')
else:
print(f'Command {command} finished successfully')
# sleep a bit to reduce CPU usage
time.sleep(0.5)
print('All tasks done')
if __name__ == '__main__':
commands = [
['echo', 'foo'],
['echo', 'bar'],
['non-existing-command'],
['sleep', '2'],
['sleep', '3'],
['sleep', '2'],
['ls', 'asnddlksandaslk'],
]
run_bash_commands_in_parallel(commands, max_tries=3, n_parallel=3)
结果:
python run_stuff.py
Running: 0, Waiting: 7
Started task ['echo', 'foo']
foo
Started task ['echo', 'bar']
bar
Failed to start command ['non-existing-command']
Started task ['sleep', '2']
Command ['echo', 'foo'] finished successfully
Command ['echo', 'bar'] finished successfully
Running: 1, Waiting: 3
Started task ['sleep', '3']
Started task ['sleep', '2']
Running: 3, Waiting: 1
Running: 3, Waiting: 1
Running: 3, Waiting: 1
Command ['sleep', '2'] finished successfully
Running: 2, Waiting: 1
Started task ['ls', 'asnddlksandaslk']
Command ['sleep', '2'] finished successfully
ls: cannot access 'asnddlksandaslk': No such file or directory
Running: 2, Waiting: 0
Running: 1, Waiting: 1
Started task ['ls', 'asnddlksandaslk']
Command ['sleep', '3'] finished successfully
ls: cannot access 'asnddlksandaslk': No such file or directory
Running: 1, Waiting: 0
Running: 0, Waiting: 1
Started task ['ls', 'asnddlksandaslk']
ls: cannot access 'asnddlksandaslk': No such file or directory
Running: 1, Waiting: 0
Command: ['ls', 'asnddlksandaslk'] errored after 3 tries
All tasks done
我有一个 python 脚本,用于 运行 多个 bash 命令(不同子进程中的每个命令)并且子进程的最大值是固定的。
代码如下,但是我不知道如何为失败的命令添加重试(返回状态代码!= 0)。
因此,例如,如果命令失败,则应将其添加回循环,并且在失败 3 次后,不应再添加此命令。
import os
import subprocess
NUMBER_OF_PROCESSES = 3
def run_bash_commands_in_parallel(commands):
"""
Run a list of bash commands in parallel with maximum number of processes
"""
processes = set()
max_processes = NUMBER_OF_PROCESSES
i = 1
for command in commands:
print("# Processing task {} / {}".format(i, len(commands)))
processes.add(subprocess.Popen(command, shell=True))
if len(processes) >= max_processes:
os.wait()
processes.difference_update(
[p for p in processes if p.poll() is not None])
i += 1
# How to rery a bash command up to 3 times?
# Check if all the child processes were closed
for p in processes:
if p.poll() is None:
p.wait()
commands = ['gdalwarp aaaa', 'gdalwarp bbbb', 'gdalwarp ccc']
run_bash_commands_in_parallel(commands)
您可以使用 2 个队列解决此问题,一个用于等待作业,一个用于当前 运行。
我们不只是附加命令,而是将命令和重试计数存储在 waiting
队列中,将 Popen
对象存储在 running
队列中。
然后我们检查所有 运行 命令的当前状态。成功完成的命令被删除,运行 命令放回 运行 队列,如果作业失败,我们增加尝试计数器并将其放回等待队列,但前提是尝试次数确实如此不超过 max_tries 参数。
我们一直这样做,直到现在更多的工作在任一队列中:
import subprocess
import time
from collections import deque
def run_bash_commands_in_parallel(commands, max_tries, n_parallel):
"""
Run a list of bash commands in parallel with maximum number of processes
"""
# we use a tuple of (command, tries) to store the information
# how often a command was already tried.
waiting = deque([(command, 1) for command in commands])
running = deque()
while len(waiting) > 0 or len(running) > 0:
print(f'Running: {len(running)}, Waiting: {len(waiting)}')
# if less than n_parallel jobs are running and we have waiting jobs,
# start new jobs
while len(waiting) > 0 and len(running) < n_parallel:
command, tries = waiting.popleft()
try:
running.append((subprocess.Popen(command), command, tries))
print(f"Started task {command}")
except OSError:
print(f'Failed to start command {command}')
# poll running commands
for _ in range(len(running)):
process, command, tries = running.popleft()
ret = process.poll()
if ret is None:
running.append((process, command, tries))
# retry errored jobs
elif ret != 0:
if tries < max_tries:
waiting.append((command, tries + 1))
else:
print(f'Command: {command} errored after {max_tries} tries')
else:
print(f'Command {command} finished successfully')
# sleep a bit to reduce CPU usage
time.sleep(0.5)
print('All tasks done')
if __name__ == '__main__':
commands = [
['echo', 'foo'],
['echo', 'bar'],
['non-existing-command'],
['sleep', '2'],
['sleep', '3'],
['sleep', '2'],
['ls', 'asnddlksandaslk'],
]
run_bash_commands_in_parallel(commands, max_tries=3, n_parallel=3)
结果:
python run_stuff.py
Running: 0, Waiting: 7
Started task ['echo', 'foo']
foo
Started task ['echo', 'bar']
bar
Failed to start command ['non-existing-command']
Started task ['sleep', '2']
Command ['echo', 'foo'] finished successfully
Command ['echo', 'bar'] finished successfully
Running: 1, Waiting: 3
Started task ['sleep', '3']
Started task ['sleep', '2']
Running: 3, Waiting: 1
Running: 3, Waiting: 1
Running: 3, Waiting: 1
Command ['sleep', '2'] finished successfully
Running: 2, Waiting: 1
Started task ['ls', 'asnddlksandaslk']
Command ['sleep', '2'] finished successfully
ls: cannot access 'asnddlksandaslk': No such file or directory
Running: 2, Waiting: 0
Running: 1, Waiting: 1
Started task ['ls', 'asnddlksandaslk']
Command ['sleep', '3'] finished successfully
ls: cannot access 'asnddlksandaslk': No such file or directory
Running: 1, Waiting: 0
Running: 0, Waiting: 1
Started task ['ls', 'asnddlksandaslk']
ls: cannot access 'asnddlksandaslk': No such file or directory
Running: 1, Waiting: 0
Command: ['ls', 'asnddlksandaslk'] errored after 3 tries
All tasks done