如何在命令行中为池进程创建进度条?

How to create a progress bar in command line for pool processes?

我有几个脚本,我 运行 使用多处理池 我正在尝试根据已完成的脚本制作一个进度条。

我查了

但我不知道如何组合在柜台完成的脚本

import os                                                                       
from multiprocessing import Pool

def run_process(process):                                                             
    os.system('python {}'.format(process))

processes = ('script1.py', 'script2.py','script3.py','script4.py')

if __name__ == "__main__":

    pool = Pool(processes=2)
    pool.map(run_process, processes)

您可以使用 pool.apply_async() 来完成,因为它支持回调函数,可用于了解目标函数何时返回。

我使用@Greenstick 的 answer 来显示进度条,但我主要修改它以符合 PEP-8 编码指南,并将它放在一个名为 print_progress_bar 的单独模块中 — 见下文。

性能说明: 虽然 可以 使用 multiprocessing.Pool 来做到这一点——我强烈怀疑你问题中的代码是文章 How to run parallel processes 中内容的逐字副本——这样做效率极低,因为每个进程都会初始化自己的 Python 解释器 double 真正需要的次数。首先执行 run_process() 函数本身,然后再次执行 运行 脚本过程。

生成过程涉及相当多的开销。这种开销可以通过 运行ning run_process() 作为 current 进程中的单独线程来减轻,这是一个更轻量级的进程。

切换到 ThreadPool 非常简单,只需更改行:
from multiprocessing import Pool

from multiprocessing.pool import ThreadPool as Pool

或者您可以使用 concurrent.futures.ThreadPoolExecutor,如我的 所示。

import os
from multiprocessing import Pool
import subprocess
import sys

from print_progress_bar import print_progress_bar
progress_bar_kwargs = dict(prefix='Progress:', suffix='Complete', length=40)

def run_process(process):
     os.system('{} {}'.format(sys.executable,  process))

def callback(_):
    """Update process count and progress bar."""
    global process_count
    process_count += 1
    print_progress_bar(process_count, len(processes), **progress_bar_kwargs)


# To simplify testing just using one script multiple times.
processes = ('./mp_scripts/script1.py', './mp_scripts/script1.py',
             './mp_scripts/script1.py', './mp_scripts/script1.py')
process_count = 0

if __name__ == '__main__':

    print_progress_bar(0, len(processes), **progress_bar_kwargs) # Print 0% progress.

    with Pool(processes=2) as pool:
        results = []
        for process in processes:
            r = pool.apply_async(run_process, (process,), {}, callback)
            results.append(r)

        while results:  # Processes still running?
            results = [r for r in results if not r.ready()]

    print('Done')

print_progress_bar.py:

# from 
def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=100,
                       fill='█', print_end="\r"):
    """ Print iterations progress.

        Call in a loop to create terminal progress bar
        @params:
            iteration   - Required  : current iteration (Int)
            total       - Required  : total iterations (Int)
            prefix      - Optional  : prefix string (Str)
            suffix      - Optional  : suffix string (Str)
            decimals    - Optional  : positive number of decimals in percent complete (Int)
            length      - Optional  : character length of bar (Int)
            fill        - Optional  : bar fill character (Str)
            print_end   - Optional  : end character (e.g. "\r", "\r\n") (Str)
    """
    percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
    filledLength = int(length * iteration // total)
    bar = fill * filledLength + '-' * (length - filledLength)
    print('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix), end=print_end, flush=True)

    if iteration == total:  # Print newline on completion.
        print(flush=True)

这里有一个稍微不同的方法,它使用 concurrent.futures.ThreadPoolExecutor instead of a multiprocessing.Pool which make it simpler and more efficient than what's in my

注意 它使用与我的其他答案中相同的 print_progress_bar.py 模块。

import concurrent.futures
import os
import subprocess
import sys

from print_progress_bar import print_progress_bar
progress_bar_kwargs = dict(prefix='Progress:', suffix='Complete', length=40)

# To simplify testing just using one script multiple times.
processes = ('./mp_scripts/script1.py', './mp_scripts/script1.py',
             './mp_scripts/script1.py', './mp_scripts/script1.py')
process_count = 0


def run_process(process):
    global process_count

    subprocess.run([sys.executable,  process])
    # Update process count and progress bar when it's done.
    process_count += 1
    print_progress_bar(process_count, len(processes), **progress_bar_kwargs)


print_progress_bar(0, len(processes), **progress_bar_kwargs) # Print 0% progress.
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:

    future_to_process = {executor.submit(run_process, process): process
                            for process in processes}
    for future in concurrent.futures.as_completed(future_to_process):
        process = future_to_process[future]
        try:
            _ = future.result()
        except Exception as exc:
            print()
            print(f'{process} generated an exception: {exc}')

    print('Done')