python pool.map_async 不等待 .wait()
python pool.map_async doesn't wait for .wait()
我正在尝试 运行 一个与 pool.apply_async
一起工作得很好的胖函数
现在,我正在尝试 pool.map_async 函数(使用 functools.partial 方法传递了 2 个参数)并且程序立即完成,没有任何错误或异常...
files = list_files(mypath) # list of files to process
csv_rows = None
result = mp.Queue() #result queue from multiprocessing module
pool = mp.Pool(4)
t = pool.map_async( partial(process_file, result), files)
t.wait() # it doesn't wait HERE ... program exits immediately - no errors
关于我可能遗漏了什么的任何线索?
首先,如果您要立即 wait
,您可能不需要 map_async
。如果是这样,那么只需使用 map
。您也可以删除 queue
而只删除 return 值。但这可能不是您遇到的问题。
问题很可能是 wait
方法 没有引发远程异常 。您的 process_file
方法很可能实际上在池进程中失败了,但您没有看到这些异常。就像提到的 Blckknght 一样,您应该切换到使用 get
方法,如您所见 here,该方法将引发远程异常。这是一个简单的示例,其中 wait
方法隐藏了远程进程异常,以及如果您切换到 get
如何再次看到它们:
import multiprocessing as mp
def just_dies(n):
raise ValueError("Test")
if __name__ == "__main__":
pool = mp.Pool(4)
results = pool.map_async(just_dies, range(10))
# the wait will immediately silently pass
#results.wait()
# this will actually raise the remote exception
results.get()
如果你运行这样做,你会收到这样的回溯错误消息
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "foo.py", line 14, in <module>
results.get()
File "XXX/python3.5/multiprocessing/pool.py", line 608, in get
raise self._value
ValueError: Test
如果您切换到使用 wait
方法,那么您将看不到它。
我正在尝试 运行 一个与 pool.apply_async
一起工作得很好的胖函数现在,我正在尝试 pool.map_async 函数(使用 functools.partial 方法传递了 2 个参数)并且程序立即完成,没有任何错误或异常...
files = list_files(mypath) # list of files to process
csv_rows = None
result = mp.Queue() #result queue from multiprocessing module
pool = mp.Pool(4)
t = pool.map_async( partial(process_file, result), files)
t.wait() # it doesn't wait HERE ... program exits immediately - no errors
关于我可能遗漏了什么的任何线索?
首先,如果您要立即 wait
,您可能不需要 map_async
。如果是这样,那么只需使用 map
。您也可以删除 queue
而只删除 return 值。但这可能不是您遇到的问题。
问题很可能是 wait
方法 没有引发远程异常 。您的 process_file
方法很可能实际上在池进程中失败了,但您没有看到这些异常。就像提到的 Blckknght 一样,您应该切换到使用 get
方法,如您所见 here,该方法将引发远程异常。这是一个简单的示例,其中 wait
方法隐藏了远程进程异常,以及如果您切换到 get
如何再次看到它们:
import multiprocessing as mp
def just_dies(n):
raise ValueError("Test")
if __name__ == "__main__":
pool = mp.Pool(4)
results = pool.map_async(just_dies, range(10))
# the wait will immediately silently pass
#results.wait()
# this will actually raise the remote exception
results.get()
如果你运行这样做,你会收到这样的回溯错误消息
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "foo.py", line 14, in <module>
results.get()
File "XXX/python3.5/multiprocessing/pool.py", line 608, in get
raise self._value
ValueError: Test
如果您切换到使用 wait
方法,那么您将看不到它。