为什么我不能在 python 多处理中关闭池之前使用 join()
Why can't I use join() before closing pool in python multiprocessing
我有一个 class 有一个方法可以进行一些并行计算并且经常被调用。因此,我希望我的池在 class 的构造函数中初始化一次,而不是在每次调用此方法时都创建一个新池。在这个方法中,我想使用 apply_async() 为所有工作进程启动一个任务,然后等待(阻塞)并聚合每个任务的结果。我的代码如下所示:
class Foo:
def __init__(self, ...):
# ...
self.pool = mp.Pool(mp.cpu_count())
def do_parallel_calculations(self, ...):
for _ in range(mp.cpu_count()):
self.pool.apply_async(calc_func, args=(...), callback=aggregate_result)
# wait for results to be aggregated to a global var by the callback
self.pool.join() # <-- ValueError: Pool is still running
# do something with the aggregated result of all worker processes
但是,当我 运行 执行此操作时,我在 self.pool.join() 中收到错误消息:“ValueError:Pool is still 运行ning”。现在,在我看到的所有示例中, self.pool.close() 在 self.pool.join() 之前被调用,我认为这就是我收到此错误的原因,但我不想在那里关闭我的池下次调用此方法!我不能不使用 self.pool.join() 因为我需要一种方法来等待所有进程完成并且我不想浪费手动旋转例如通过使用“while not global_flag: pass ".
我可以做些什么来实现我想要做的事情?为什么 multiprocessing 不让我加入一个仍然开放的池?这似乎是一件非常合理的事情。
我想我设法通过在 apply_async() returns 的 AsyncResult 对象上调用 get() 来做到这一点。所以代码变成:
def do_parallel_calculations(self, ...):
results = []
for _ in range(mp.cpu_count()):
results.append(self.pool.apply_async(calc_func, args=(...)))
aggregated_result = 0
for result in results:
aggregated_result += result.get()
其中 calc_func() returns 单个任务结果,不需要回调和全局变量。
这并不理想,因为我以任意顺序等待它们,而不是按照它们实际完成的顺序(最有效的方法是减少结果)但是因为我只有 4 个内核,所以应该几乎不引人注意。
让我们用一个真实的例子来具体说明:
import multiprocessing as mp
def calc_func(x):
return x * x
class Foo:
def __init__(self):
self.pool = mp.Pool(mp.cpu_count())
def do_parallel_calculations(self, values):
results = []
for value in values:
results.append(self.pool.apply_async(calc_func, args=(value,)))
for result in results:
print(result.get())
if __name__ == '__main__':
foo = Foo()
foo.do_parallel_calculations([1,2,3])
我有一个 class 有一个方法可以进行一些并行计算并且经常被调用。因此,我希望我的池在 class 的构造函数中初始化一次,而不是在每次调用此方法时都创建一个新池。在这个方法中,我想使用 apply_async() 为所有工作进程启动一个任务,然后等待(阻塞)并聚合每个任务的结果。我的代码如下所示:
class Foo:
def __init__(self, ...):
# ...
self.pool = mp.Pool(mp.cpu_count())
def do_parallel_calculations(self, ...):
for _ in range(mp.cpu_count()):
self.pool.apply_async(calc_func, args=(...), callback=aggregate_result)
# wait for results to be aggregated to a global var by the callback
self.pool.join() # <-- ValueError: Pool is still running
# do something with the aggregated result of all worker processes
但是,当我 运行 执行此操作时,我在 self.pool.join() 中收到错误消息:“ValueError:Pool is still 运行ning”。现在,在我看到的所有示例中, self.pool.close() 在 self.pool.join() 之前被调用,我认为这就是我收到此错误的原因,但我不想在那里关闭我的池下次调用此方法!我不能不使用 self.pool.join() 因为我需要一种方法来等待所有进程完成并且我不想浪费手动旋转例如通过使用“while not global_flag: pass ".
我可以做些什么来实现我想要做的事情?为什么 multiprocessing 不让我加入一个仍然开放的池?这似乎是一件非常合理的事情。
我想我设法通过在 apply_async() returns 的 AsyncResult 对象上调用 get() 来做到这一点。所以代码变成:
def do_parallel_calculations(self, ...):
results = []
for _ in range(mp.cpu_count()):
results.append(self.pool.apply_async(calc_func, args=(...)))
aggregated_result = 0
for result in results:
aggregated_result += result.get()
其中 calc_func() returns 单个任务结果,不需要回调和全局变量。
这并不理想,因为我以任意顺序等待它们,而不是按照它们实际完成的顺序(最有效的方法是减少结果)但是因为我只有 4 个内核,所以应该几乎不引人注意。
让我们用一个真实的例子来具体说明:
import multiprocessing as mp
def calc_func(x):
return x * x
class Foo:
def __init__(self):
self.pool = mp.Pool(mp.cpu_count())
def do_parallel_calculations(self, values):
results = []
for value in values:
results.append(self.pool.apply_async(calc_func, args=(value,)))
for result in results:
print(result.get())
if __name__ == '__main__':
foo = Foo()
foo.do_parallel_calculations([1,2,3])