Python 多处理 - 了解每个进程进度的最佳方式

Python multiprocessing - best way to understand progress for each process

我想知道我的进程的进度。目前我使用的不是很有效。这是一个 mwe:

import time
from multiprocessing import Pool as ProcessPool
import progressbar
import random

def some_random_calculation(n):
    with progressbar.ProgressBar(max_value=n) as bar:
        for i in range(0,n):
            time.sleep(1)
            bar.update(i)

if __name__=='__main__':

    arguments = [random.randint(4,10) for i in range(4)]

    pool = ProcessPool(4)
    results = pool.map_async(some_random_calculation, arguments)
    print(results.get())
    pool.close() 
    pool.join()

在本例中,我使用的是 progressbar2,但是,当进程超过 1 时,输出会在同一行上不断更新:

您从图像中看到条形是按排序顺序排列的,因为在第一个条形图结束后,其他进程创建了一个新的条形图。当有多个进程时,单个柱在同一行上更新。

我正在寻找解决我的问题的方法,动态更新 n 栏会很酷。但是,可能有一种更聪明的方法来了解不同流程的进度。有什么建议吗?

所以这 到目前为止还不完美 ,如果您想把所有事情都做对的话,这个主题相当复杂。但有一件事是肯定的,你应该从子流程外部监控进度。

最快也可能是最简单的方法是拥有一个 returns 状态的调用函数,并且外部的调控器可以让用户随时了解进度。那看起来像这样:

import os, signal
from threading import Thread, enumerate as t_enumerate
from time import time, sleep
from random import random

clear = lambda: os.system('cls' if os.name=='nt' else 'clear')

def sig_handler(signal, frame):
    for t in t_enumerate():
        if t.getName() != 'MainThread':
            t.stop()
    exit(0)
signal.signal(signal.SIGINT, sig_handler)

class worker(Thread):
    def __init__(self, init_value=0):
        Thread.__init__(self)
        self.init_value = init_value
        self.progress = 0
        self.run_state = True
        self.start() # Start ourselves instead of from outside.

    def poll(self):
        return self.progress

    def stop(self):
        self.run_state = False

    def run(self):
        main_thread = None
        for t in t_enumerate():
            if t.getName() == 'MainThread':
                main_thread = t
                break

        while main_thread and self.run_state and main_thread.isAlive():
            for i in range(0, 100):
                self.init_value *= i
                self.progress = i
                sleep(random())
            break # Yea kinda unessecary while loop. meh..

workers = [worker(0) for i in range(4)]

while len(t_enumerate()) > 1:
    clear()
    for index, worker_handle in enumerate(workers):
        progress = worker_handle.poll()
        print(f'Thread {index} is at {progress}/100.')
    sleep(1)

另一种方法是让每个线程在打印前获取线程池的锁。但这增加了复杂性,对于初学者来说,他们都需要在 打印时间同步,这样他们就不会随意获取锁来打印,但你在其他一些打印其他内容的输出过程的一部分。或者他们会以错误的顺序打印,或者你需要跟踪你应该回溯哪一行来重写..

这里可能会有一位线程专家提供更好的答案,但这是我的两分钱。只需添加一个轮询函数,进行组合状态更新,并忍受调用每个线程所需的非常有限的处理能力。除非您有数千个,否则多次调用不会对性能产生任何影响。