Return异常触发后的处理结果
Return result of process after an exception is triggered
我有一个多处理设置,它通过将所有计算值附加到 lst
来处理较长的 运行 任务。它看起来大致是这样的:
from multiprocessing import Pool
from time import sleep
def fun(_):
lst = [] # list that will be returned
for i in range(200):
lst.append(i)
if not i % 10:
sleep(0.1) # 'long task', cause a KeyboardInterrupt in this time
return lst
if __name__ == '__main__':
master = []
processes = 2
for result in Pool(processes).imap_unordered(fun, range(processes)):
master.append(result)
print(master)
我希望能够引发 KeyboardInterrupt
并让进程 return 他们处理的列表,即使它们尚未完成,因为每次迭代都只是添加一个新的子列表。
(我的实际数据看起来大致像 lst = ([], [[], ...], [[], ...])
,每个空列表只包含整数,实际函数将 return lst1, lst2, lst3
)
我试过将整个主要部分包裹在 try: except:
中,如下所示:
try:
for result in Pool(processes).imap_unordered(fun, range(processes)):
master.append(result)
except KeyboardInterrupt:
# somehow retrieve the values here
pass
然而,我还没有通过这种方式得出任何可能的解决方案。
我怎样才能告诉进程是时候提前退出了,并且 return 我知道他们当前的结果?
编辑以显示实际结构:
main.py:
from other import Other
class Something:
def __init__(self):
pass # stuff here
def spawner(self):
for result in Pool(processes=self.processes).imap_unordered(self.loop, range(self.processes)):
pass # do stuff with the data
def loop(self, _):
# setup stuff
Other(setup_stuff).start()
other.py
class Other:
def __init__(self):
pass # more stuff
def start(self):
lst1, lst2, lst3 = [], [], []
for _ in range(self.episodes):
pass # do the actual computation
return lst1, lst2, lst3
也许您可以使用 multiprocessing.Queue
而不是 list
到 return 变量。一开始设置一个队列,所有进程都会写入队列。
最后,从队列中读取所有值。
from time import sleep
from multiprocessing import Pool, Queue
q = None
def set_global_data(queue):
global q
q = queue
def fun(_):
for i in range(200):
q.put_nowait(i)
if not i % 10:
sleep(0.1) # 'long task', cause a KeyboardInterrupt in this time
# nothing is returned
if __name__ == "__main__":
master = Queue()
processes = 2
try:
with Pool(processes, set_global_data, (master,)) as p:
for result in p.imap_unordered(fun, range(processes)):
pass
except KeyboardInterrupt:
pass
while not master.empty():
v = master.get_nowait()
print(v)
编辑:有多个文件:
main.py
from other import Other
from multiprocessing import Pool, Queue
class Something:
def __init__(self):
pass # stuff here
def spawner(self):
master = Queue()
try:
with Pool(2, Something.set_global_data, (master,)) as p:
for _ in p.imap_unordered(self.loop, range(2)):
pass
except KeyboardInterrupt:
pass
while not master.empty():
v = master.get_nowait()
print(v)
def loop(self, _):
# setup stuff
Other().start()
@staticmethod
def set_global_data(queue):
Other.q = queue
s = Something()
s.spawner()
other.py
from time import sleep
class Other:
q = None
def __init__(self):
pass # more stuff
def start(self):
for i in range(200):
Other.q.put_nowait(i)
if not i % 10:
sleep(0.1)
我有一个多处理设置,它通过将所有计算值附加到 lst
来处理较长的 运行 任务。它看起来大致是这样的:
from multiprocessing import Pool
from time import sleep
def fun(_):
lst = [] # list that will be returned
for i in range(200):
lst.append(i)
if not i % 10:
sleep(0.1) # 'long task', cause a KeyboardInterrupt in this time
return lst
if __name__ == '__main__':
master = []
processes = 2
for result in Pool(processes).imap_unordered(fun, range(processes)):
master.append(result)
print(master)
我希望能够引发 KeyboardInterrupt
并让进程 return 他们处理的列表,即使它们尚未完成,因为每次迭代都只是添加一个新的子列表。
(我的实际数据看起来大致像 lst = ([], [[], ...], [[], ...])
,每个空列表只包含整数,实际函数将 return lst1, lst2, lst3
)
我试过将整个主要部分包裹在 try: except:
中,如下所示:
try:
for result in Pool(processes).imap_unordered(fun, range(processes)):
master.append(result)
except KeyboardInterrupt:
# somehow retrieve the values here
pass
然而,我还没有通过这种方式得出任何可能的解决方案。 我怎样才能告诉进程是时候提前退出了,并且 return 我知道他们当前的结果?
编辑以显示实际结构: main.py:
from other import Other
class Something:
def __init__(self):
pass # stuff here
def spawner(self):
for result in Pool(processes=self.processes).imap_unordered(self.loop, range(self.processes)):
pass # do stuff with the data
def loop(self, _):
# setup stuff
Other(setup_stuff).start()
other.py
class Other:
def __init__(self):
pass # more stuff
def start(self):
lst1, lst2, lst3 = [], [], []
for _ in range(self.episodes):
pass # do the actual computation
return lst1, lst2, lst3
也许您可以使用 multiprocessing.Queue
而不是 list
到 return 变量。一开始设置一个队列,所有进程都会写入队列。
最后,从队列中读取所有值。
from time import sleep
from multiprocessing import Pool, Queue
q = None
def set_global_data(queue):
global q
q = queue
def fun(_):
for i in range(200):
q.put_nowait(i)
if not i % 10:
sleep(0.1) # 'long task', cause a KeyboardInterrupt in this time
# nothing is returned
if __name__ == "__main__":
master = Queue()
processes = 2
try:
with Pool(processes, set_global_data, (master,)) as p:
for result in p.imap_unordered(fun, range(processes)):
pass
except KeyboardInterrupt:
pass
while not master.empty():
v = master.get_nowait()
print(v)
编辑:有多个文件:
main.py
from other import Other
from multiprocessing import Pool, Queue
class Something:
def __init__(self):
pass # stuff here
def spawner(self):
master = Queue()
try:
with Pool(2, Something.set_global_data, (master,)) as p:
for _ in p.imap_unordered(self.loop, range(2)):
pass
except KeyboardInterrupt:
pass
while not master.empty():
v = master.get_nowait()
print(v)
def loop(self, _):
# setup stuff
Other().start()
@staticmethod
def set_global_data(queue):
Other.q = queue
s = Something()
s.spawner()
other.py
from time import sleep
class Other:
q = None
def __init__(self):
pass # more stuff
def start(self):
for i in range(200):
Other.q.put_nowait(i)
if not i % 10:
sleep(0.1)