Pool.apply_async(): 嵌套函数没有执行
Pool.apply_async(): nested function is not executed
我正在熟悉 Python 的 multiprocessing
模块。以下代码按预期工作:
#outputs 0 1 2 3
from multiprocessing import Pool
def run_one(x):
print x
return
pool = Pool(processes=12)
for i in range(4):
pool.apply_async(run_one, (i,))
pool.close()
pool.join()
然而,现在,如果我在上面的代码周围包装一个函数,print
语句不会执行(或者至少重定向输出):
#outputs nothing
def run():
def run_one(x):
print x
return
pool = Pool(processes=12)
for i in range(4):
pool.apply_async(run_one, (i,))
pool.close()
pool.join()
如果我将 run_one
定义移到 run
之外,当我调用 run()
:
时,输出又是预期的
#outputs 0 1 2 3
def run_one(x):
print x
return
def run():
pool = Pool(processes=12)
for i in range(4):
pool.apply_async(run_one, (i,))
pool.close()
pool.join()
我在这里错过了什么?为什么第二个片段不打印任何东西?如果我简单地调用 run_one(i)
函数而不是使用 apply_async
,所有三个代码输出相同。
池需要腌制(序列化)它发送给 worker-processes 的所有内容。 Pickling 实际上只保存函数的名称,而 unpickling 需要 re-importing 按名称保存函数。
为此,函数需要在 top-level 处定义,嵌套函数将不会被子项导入,并且已经尝试 pickle 它们会引发异常:
from multiprocessing.connection import _ForkingPickler
def run():
def foo(x):
pass
_ForkingPickler.dumps(foo) # multiprocessing custom pickler;
# same effect with pickle.dumps(foo)
run()
# Out:
Traceback (most recent call last):
...
AttributeError: Can't pickle local object 'run.<locals>.foo'
你看不到异常的原因是,因为 Pool
已经开始在父级的酸洗任务期间捕获异常,并且只有 re-raises 当你调用 .get()
调用 pool.apply_async()
.
时立即获得的 AsyncResult
对象
这就是为什么(使用 Python 2)你最好总是这样使用它,即使你的 target-function 没有 return 任何东西(仍然 returns 隐式None
):
results = [pool.apply_async(foo, (i,)) for i in range(4)]
# `pool.apply_async()` immediately returns AsyncResult (ApplyResult) object
for res in results:
res.get()
Non-async Pool-methods 与 Pool.map()
和 Pool.starmap()
一样,在后台使用相同的(异步)low-level 函数,就像它们的异步兄弟一样,但它们另外为您调用 .get()
,因此您将始终看到这些方法的异常。
Python 3 有一个用于异步的 error_callback
参数 Pool-methods 你可以用它来处理异常。
我正在熟悉 Python 的 multiprocessing
模块。以下代码按预期工作:
#outputs 0 1 2 3
from multiprocessing import Pool
def run_one(x):
print x
return
pool = Pool(processes=12)
for i in range(4):
pool.apply_async(run_one, (i,))
pool.close()
pool.join()
然而,现在,如果我在上面的代码周围包装一个函数,print
语句不会执行(或者至少重定向输出):
#outputs nothing
def run():
def run_one(x):
print x
return
pool = Pool(processes=12)
for i in range(4):
pool.apply_async(run_one, (i,))
pool.close()
pool.join()
如果我将 run_one
定义移到 run
之外,当我调用 run()
:
#outputs 0 1 2 3
def run_one(x):
print x
return
def run():
pool = Pool(processes=12)
for i in range(4):
pool.apply_async(run_one, (i,))
pool.close()
pool.join()
我在这里错过了什么?为什么第二个片段不打印任何东西?如果我简单地调用 run_one(i)
函数而不是使用 apply_async
,所有三个代码输出相同。
池需要腌制(序列化)它发送给 worker-processes 的所有内容。 Pickling 实际上只保存函数的名称,而 unpickling 需要 re-importing 按名称保存函数。 为此,函数需要在 top-level 处定义,嵌套函数将不会被子项导入,并且已经尝试 pickle 它们会引发异常:
from multiprocessing.connection import _ForkingPickler
def run():
def foo(x):
pass
_ForkingPickler.dumps(foo) # multiprocessing custom pickler;
# same effect with pickle.dumps(foo)
run()
# Out:
Traceback (most recent call last):
...
AttributeError: Can't pickle local object 'run.<locals>.foo'
你看不到异常的原因是,因为 Pool
已经开始在父级的酸洗任务期间捕获异常,并且只有 re-raises 当你调用 .get()
调用 pool.apply_async()
.
AsyncResult
对象
这就是为什么(使用 Python 2)你最好总是这样使用它,即使你的 target-function 没有 return 任何东西(仍然 returns 隐式None
):
results = [pool.apply_async(foo, (i,)) for i in range(4)]
# `pool.apply_async()` immediately returns AsyncResult (ApplyResult) object
for res in results:
res.get()
Non-async Pool-methods 与 Pool.map()
和 Pool.starmap()
一样,在后台使用相同的(异步)low-level 函数,就像它们的异步兄弟一样,但它们另外为您调用 .get()
,因此您将始终看到这些方法的异常。
Python 3 有一个用于异步的 error_callback
参数 Pool-methods 你可以用它来处理异常。