多处理池的意外行为 map_async
unexpected behaviour of multiprocessing Pool map_async
我有一些代码可以对 python 3 应用程序中的多个文件执行相同的操作,因此看起来很适合 multiprocessing
。我正在尝试使用 Pool
将工作分配给一些进程。我希望代码在进行这些计算时继续做其他事情(主要是为用户显示内容),所以我想使用 multiprocessing.Pool
[=28] 的 map_async
函数=] 为此。我希望在调用它之后,代码将继续,结果将由我指定的回调处理,但这似乎并没有发生。以下代码显示了我尝试调用 map_async
的三种方式以及我看到的结果:
import multiprocessing
NUM_PROCS = 4
def func(arg_list):
arg1 = arg_list[0]
arg2 = arg_list[1]
print('start func')
print ('arg1 = {0}'.format(arg1))
print ('arg2 = {0}'.format(arg2))
time.sleep(1)
result1 = arg1 * arg2
print('end func')
return result1
def callback(result):
print('result is {0}'.format(result))
def error_handler(error1):
print('error in call\n {0}'.format(error1))
def async1(arg_list1):
# This is how my understanding of map_async suggests i should
# call it. When I execute this, the target function func() is not called
with multiprocessing.Pool(NUM_PROCS) as p1:
r1 = p1.map_async(func,
arg_list1,
callback=callback,
error_callback=error_handler)
def async2(arg_list1):
with multiprocessing.Pool(NUM_PROCS) as p1:
# If I call the wait function on the result for a small
# amount of time, then the target function func() is called
# and executes sucessfully in 2 processes, but the callback
# function is never called so the results are not processed
r1 = p1.map_async(func,
arg_list1,
callback=callback,
error_callback=error_handler)
r1.wait(0.1)
def async3(arg_list1):
# if I explicitly call join on the pool, then the target function func()
# successfully executes in 2 processes and the callback function is also
# called, but by calling join the processing is not asynchronous any more
# as join blocks the main process until the other processes are finished.
with multiprocessing.Pool(NUM_PROCS) as p1:
r1 = p1.map_async(func,
arg_list1,
callback=callback,
error_callback=error_handler)
p1.close()
p1.join()
def main():
arg_list1 = [(5, 3), (7, 4), (-8, 10), (4, 12)]
async3(arg_list1)
print('pool executed successfully')
if __name__ == '__main__':
main()
当在 main 中调用 async1
、async2
或 async3
时,结果在每个函数的注释中描述。任何人都可以解释为什么不同的调用会以它们的方式运行吗?最终我想像在 async1
中那样调用 map_async
,这样我就可以在工作进程繁忙时在主进程中做一些事情。我已经用 python 2.7 和 3.6 在较旧的 RH6 linux 机器和较新的 ubuntu VM 上测试了这段代码,结果相同。
发生这种情况是因为当您使用 multiprocessing.Pool
作为上下文管理器时,pool.terminate()
is called when you leave the with
block 会立即退出所有工作程序,而无需等待 in-progress 任务完成。
New in version 3.3: Pool
objects now support the context management protocol – see Context Manager Types. __enter__()
returns the pool object, and __exit__()
calls terminate()
.
IMO 使用 terminate()
作为上下文管理器的 __exit__
方法并不是一个很好的设计选择,因为似乎大多数人直觉地期望 close()
会被调用,这将等待 in-progress 个任务完成后再退出。不幸的是,您所能做的就是重构代码而不使用上下文管理器,或者重构代码以保证在 Pool
完成其工作之前不会离开 with
块。
我有一些代码可以对 python 3 应用程序中的多个文件执行相同的操作,因此看起来很适合 multiprocessing
。我正在尝试使用 Pool
将工作分配给一些进程。我希望代码在进行这些计算时继续做其他事情(主要是为用户显示内容),所以我想使用 multiprocessing.Pool
[=28] 的 map_async
函数=] 为此。我希望在调用它之后,代码将继续,结果将由我指定的回调处理,但这似乎并没有发生。以下代码显示了我尝试调用 map_async
的三种方式以及我看到的结果:
import multiprocessing
NUM_PROCS = 4
def func(arg_list):
arg1 = arg_list[0]
arg2 = arg_list[1]
print('start func')
print ('arg1 = {0}'.format(arg1))
print ('arg2 = {0}'.format(arg2))
time.sleep(1)
result1 = arg1 * arg2
print('end func')
return result1
def callback(result):
print('result is {0}'.format(result))
def error_handler(error1):
print('error in call\n {0}'.format(error1))
def async1(arg_list1):
# This is how my understanding of map_async suggests i should
# call it. When I execute this, the target function func() is not called
with multiprocessing.Pool(NUM_PROCS) as p1:
r1 = p1.map_async(func,
arg_list1,
callback=callback,
error_callback=error_handler)
def async2(arg_list1):
with multiprocessing.Pool(NUM_PROCS) as p1:
# If I call the wait function on the result for a small
# amount of time, then the target function func() is called
# and executes sucessfully in 2 processes, but the callback
# function is never called so the results are not processed
r1 = p1.map_async(func,
arg_list1,
callback=callback,
error_callback=error_handler)
r1.wait(0.1)
def async3(arg_list1):
# if I explicitly call join on the pool, then the target function func()
# successfully executes in 2 processes and the callback function is also
# called, but by calling join the processing is not asynchronous any more
# as join blocks the main process until the other processes are finished.
with multiprocessing.Pool(NUM_PROCS) as p1:
r1 = p1.map_async(func,
arg_list1,
callback=callback,
error_callback=error_handler)
p1.close()
p1.join()
def main():
arg_list1 = [(5, 3), (7, 4), (-8, 10), (4, 12)]
async3(arg_list1)
print('pool executed successfully')
if __name__ == '__main__':
main()
当在 main 中调用 async1
、async2
或 async3
时,结果在每个函数的注释中描述。任何人都可以解释为什么不同的调用会以它们的方式运行吗?最终我想像在 async1
中那样调用 map_async
,这样我就可以在工作进程繁忙时在主进程中做一些事情。我已经用 python 2.7 和 3.6 在较旧的 RH6 linux 机器和较新的 ubuntu VM 上测试了这段代码,结果相同。
发生这种情况是因为当您使用 multiprocessing.Pool
作为上下文管理器时,pool.terminate()
is called when you leave the with
block 会立即退出所有工作程序,而无需等待 in-progress 任务完成。
New in version 3.3:
Pool
objects now support the context management protocol – see Context ManagerTypes. __enter__()
returns the pool object, and__exit__()
callsterminate()
.
IMO 使用 terminate()
作为上下文管理器的 __exit__
方法并不是一个很好的设计选择,因为似乎大多数人直觉地期望 close()
会被调用,这将等待 in-progress 个任务完成后再退出。不幸的是,您所能做的就是重构代码而不使用上下文管理器,或者重构代码以保证在 Pool
完成其工作之前不会离开 with
块。