python - 运行 多个子进程并行但允许在 bash 命令失败时重试?

python - run multiple subprocesses in parallel but allow to retry when a bash command is failed?

我有一个 python 脚本,用于 运行 多个 bash 命令(不同子进程中的每个命令)并且子进程的最大值是固定的。

代码如下,但是我不知道如何为失败的命令添加重试(返回状态代码!= 0)。

因此,例如,如果命令失败,则应将其添加回循环,并且在失败 3 次后,不应再添加此命令。

import os
import subprocess

NUMBER_OF_PROCESSES = 3

def run_bash_commands_in_parallel(commands):
    """
    Run a list of bash commands in parallel with maximum number of processes
    """
    processes = set()
    max_processes = NUMBER_OF_PROCESSES

    i = 1
    for command in commands:
        print("# Processing task {} / {}".format(i, len(commands)))
        processes.add(subprocess.Popen(command, shell=True))

        if len(processes) >= max_processes:
            os.wait()
            processes.difference_update(
                [p for p in processes if p.poll() is not None])

        i += 1

    # How to rery a bash command up to 3 times?

    # Check if all the child processes were closed
    for p in processes:
        if p.poll() is None:
            p.wait()

commands = ['gdalwarp aaaa', 'gdalwarp bbbb', 'gdalwarp ccc']

run_bash_commands_in_parallel(commands)

您可以使用 2 个队列解决此问题,一个用于等待作业,一个用于当前 运行。

我们不只是附加命令,而是将命令和重试计数存储在 waiting 队列中,将 Popen 对象存储在 running 队列中。

然后我们检查所有 运行 命令的当前状态。成功完成的命令被删除,运行 命令放回 运行 队列,如果作业失败,我们增加尝试计数器并将其放回等待队列,但前提是尝试次数确实如此不超过 max_tries 参数。

我们一直这样做,直到现在更多的工作在任一队列中:

import subprocess
import time
from collections import deque


def run_bash_commands_in_parallel(commands, max_tries, n_parallel):
    """
    Run a list of bash commands in parallel with maximum number of processes
    """
    # we use a tuple of (command, tries) to store the information
    # how often a command was already tried.
    waiting = deque([(command, 1) for command in commands])
    running = deque()

    while len(waiting) > 0 or len(running) > 0:
        print(f'Running: {len(running)}, Waiting: {len(waiting)}')

        # if less than n_parallel jobs are running and we have waiting jobs,
        # start new jobs
        while len(waiting) > 0 and len(running) < n_parallel:
            command, tries = waiting.popleft()
            try:
                running.append((subprocess.Popen(command), command, tries))
                print(f"Started task {command}")
            except OSError:
                print(f'Failed to start command {command}')

        # poll running commands
        for _ in range(len(running)):
            process, command, tries = running.popleft()
            ret = process.poll()
            if ret is None:
                running.append((process, command, tries))
            # retry errored jobs
            elif ret != 0:
                if tries < max_tries:
                    waiting.append((command, tries  + 1))
                else:
                    print(f'Command: {command} errored after {max_tries} tries')
            else:
                print(f'Command {command} finished successfully')

        # sleep a bit to reduce CPU usage
        time.sleep(0.5)
    print('All tasks done')

if __name__ == '__main__':
    commands = [
        ['echo', 'foo'],
        ['echo', 'bar'],
        ['non-existing-command'],
        ['sleep', '2'],
        ['sleep', '3'],
        ['sleep', '2'],
        ['ls', 'asnddlksandaslk'],
    ]
    run_bash_commands_in_parallel(commands, max_tries=3, n_parallel=3)

结果:

python run_stuff.py
Running: 0, Waiting: 7
Started task ['echo', 'foo']
foo
Started task ['echo', 'bar']
bar
Failed to start command ['non-existing-command']
Started task ['sleep', '2']
Command ['echo', 'foo'] finished successfully
Command ['echo', 'bar'] finished successfully
Running: 1, Waiting: 3
Started task ['sleep', '3']
Started task ['sleep', '2']
Running: 3, Waiting: 1
Running: 3, Waiting: 1
Running: 3, Waiting: 1
Command ['sleep', '2'] finished successfully
Running: 2, Waiting: 1
Started task ['ls', 'asnddlksandaslk']
Command ['sleep', '2'] finished successfully
ls: cannot access 'asnddlksandaslk': No such file or directory
Running: 2, Waiting: 0
Running: 1, Waiting: 1
Started task ['ls', 'asnddlksandaslk']
Command ['sleep', '3'] finished successfully
ls: cannot access 'asnddlksandaslk': No such file or directory
Running: 1, Waiting: 0
Running: 0, Waiting: 1
Started task ['ls', 'asnddlksandaslk']
ls: cannot access 'asnddlksandaslk': No such file or directory
Running: 1, Waiting: 0
Command: ['ls', 'asnddlksandaslk'] errored after 3 tries
All tasks done