Return异常触发后的处理结果

Return result of process after an exception is triggered

我有一个多处理设置,它通过将所有计算值附加到 lst 来处理较长的 运行 任务。它看起来大致是这样的:

from multiprocessing import Pool
from time import sleep


def fun(_):
    lst = []  # list that will be returned
    for i in range(200):
        lst.append(i)
        if not i % 10:
            sleep(0.1)  # 'long task', cause a KeyboardInterrupt in this time
    return lst


if __name__ == '__main__':
    master = []
    processes = 2
    for result in Pool(processes).imap_unordered(fun, range(processes)):
        master.append(result)
    print(master)

我希望能够引发 KeyboardInterrupt 并让进程 return 他们处理的列表,即使它们尚未完成,因为每次迭代都只是添加一个新的子列表。 (我的实际数据看起来大致像 lst = ([], [[], ...], [[], ...]),每个空列表只包含整数,实际函数将 return lst1, lst2, lst3

我试过将整个主要部分包裹在 try: except: 中,如下所示:

try:
    for result in Pool(processes).imap_unordered(fun, range(processes)):
        master.append(result)
except KeyboardInterrupt:
    # somehow retrieve the values here
    pass

然而,我还没有通过这种方式得出任何可能的解决方案。 我怎样才能告诉进程是时候提前退出了,并且 return 我知道他们当前的结果?

编辑以显示实际结构: main.py:


from other import Other

class Something:
    def __init__(self):
        pass  # stuff here
    
    def spawner(self):
        for result in Pool(processes=self.processes).imap_unordered(self.loop, range(self.processes)):
            pass  # do stuff with the data

    def loop(self, _):
        # setup stuff
        Other(setup_stuff).start()

other.py


class Other:
    def __init__(self):
        pass  # more stuff

    def start(self):
        lst1, lst2, lst3 = [], [], []
        for _ in range(self.episodes):
            pass  # do the actual computation
        return lst1, lst2, lst3

也许您可以使用 multiprocessing.Queue 而不是 list 到 return 变量。一开始设置一个队列,所有进程都会写入队列。

最后,从队列中读取所有值。

from time import sleep
from multiprocessing import Pool, Queue

q = None


def set_global_data(queue):
    global q
    q = queue


def fun(_):
    for i in range(200):
        q.put_nowait(i)
        if not i % 10:
            sleep(0.1)  # 'long task', cause a KeyboardInterrupt in this time
    # nothing is returned


if __name__ == "__main__":
    master = Queue()
    processes = 2

    try:
        with Pool(processes, set_global_data, (master,)) as p:
            for result in p.imap_unordered(fun, range(processes)):
                pass
    except KeyboardInterrupt:
        pass

    while not master.empty():
        v = master.get_nowait()
        print(v)

编辑:有多个文件:

main.py

from other import Other
from multiprocessing import Pool, Queue


class Something:
    def __init__(self):
        pass  # stuff here

    def spawner(self):
        master = Queue()

        try:
            with Pool(2, Something.set_global_data, (master,)) as p:
                for _ in p.imap_unordered(self.loop, range(2)):
                    pass
        except KeyboardInterrupt:
            pass

        while not master.empty():
            v = master.get_nowait()
            print(v)

    def loop(self, _):
        # setup stuff
        Other().start()

    @staticmethod
    def set_global_data(queue):
        Other.q = queue


s = Something()
s.spawner()

other.py

from time import sleep


class Other:
    q = None

    def __init__(self):
        pass  # more stuff

    def start(self):
        for i in range(200):
            Other.q.put_nowait(i)
            if not i % 10:
                sleep(0.1)