在 for 循环中 Python 中进行多处理并传递多个参数
Mutliprocessing in Python in a for loop and passing multiple Arguments
我正在使用 python 脚本进行大量计算。因为它是 CPU-bound,所以我通常使用线程模块的方法没有产生任何性能改进。
我现在尝试使用多处理而不是多线程来更好地使用我的 CPU 并加快冗长的计算。
我在 Whosebug 上找到了一些示例代码,但我没有让脚本接受多个参数。有人可以帮我解决这个问题吗?我以前从未使用过这些模块,我很确定我使用的 Pool.map 是错误的。 - 任何帮助表示赞赏。也欢迎使用其他方法来完成多处理。
from multiprocessing import Pool
def calculation(foo, bar, foobar, baz):
# Do a lot of calculations based on the variables
# Later the result is written to a file.
result = foo * bar * foobar * baz
print(result)
if __name__ == '__main__':
for foo in range(3):
for bar in range(5):
for baz in range(4):
for foobar in range(10):
Pool.map(calculation, foo, bar, foobar, baz)
Pool.close()
Pool.join()
正如您所怀疑的那样,您使用 map
错误的方式不止一种。
map
的要点是对可迭代对象的所有元素调用一个函数。就像内置的 map
function, but in parallel. If you want queue a single call, just use apply_async
.
对于您具体询问的问题:map
需要一个 single-argument 函数。如果你想传递多个参数,你可以修改或包装你的函数以采用单个元组而不是多个参数(我将在最后显示),或者只使用 starmap
。或者,如果你想使用 apply_async
,它接受多个参数的函数,但你传递 apply_async
一个参数元组,而不是单独的参数。
- 您需要在
Pool
实例上调用 map
,而不是 Pool
class。您要做的类似于尝试 read
从文件类型而不是从特定的打开文件中读取。
- 您正试图在每次迭代后关闭并加入
Pool
。在完成所有这些之前,您不想这样做,否则您的代码将只等待第一个完成,然后为第二个引发异常。
因此,可行的最小更改是:
if __name__ == '__main__':
pool = Pool()
for foo in range(3):
for bar in range(5):
for baz in range(4):
for foobar in range(10):
pool.apply_async(calculation, (foo, bar, foobar, baz))
pool.close()
pool.join()
请注意,我将所有内容都保存在 if __name__ == '__main__':
块中——包括新的 Pool()
构造函数。我不会在后面的示例中展示这一点,但由于文档的 Programming guidelines 部分中解释的原因,这对所有示例都是必需的。1
如果您想使用 map
函数之一,则需要一个充满参数的可迭代对象,如下所示:
pool = Pool()
args = ((foo, bar, foobar, baz)
for foo in range(3)
for bar in range(5)
for baz in range(4)
for foobar in range(10))
pool.starmap(calculation, args)
pool.close()
pool.join()
或者,更简单地说:
pool = Pool()
pool.starmap(calculate, itertools.product(range(3), range(5), range(4), range(10)))
pool.close()
pool.join()
假设您没有使用旧版本的 Python,您可以通过在 with
语句中使用 Pool
来进一步简化它:
with Pool() as pool:
pool.starmap(calculate,
itertools.product(range(3), range(5), range(4), range(10)))
使用 map
或 starmap
的一个问题是它会做额外的工作以确保您按顺序返回结果。但是您只是返回 None
并忽略了它,那么为什么这样做呢?
使用apply_async
没有这个问题。
您也可以将 map
替换为 imap_unordered
,但是没有 istarmap_unordered
,因此您需要包装您的函数以不需要 starmap
:
def starcalculate(args):
return calculate(*args)
with Pool() as pool:
pool.imap_unordered(starcalculate,
itertools.product(range(3), range(5), range(4), range(10)))
1.如果您使用的是 spawn
或 forkserver
启动方法——并且 spawn
是 Windows 的默认值——每个 child 进程都相当于 import
你的模块。因此,所有不受 __main__
守卫保护的 top-level 代码将在每个 child 中获得 运行。该模块试图保护您免受一些最坏的后果(例如,不是用 children 的指数爆炸来创建新的 children,您通常会得到一个异常,而不是用叉子轰炸您的计算机),但它不能使代码实际工作。
我正在使用 python 脚本进行大量计算。因为它是 CPU-bound,所以我通常使用线程模块的方法没有产生任何性能改进。
我现在尝试使用多处理而不是多线程来更好地使用我的 CPU 并加快冗长的计算。
我在 Whosebug 上找到了一些示例代码,但我没有让脚本接受多个参数。有人可以帮我解决这个问题吗?我以前从未使用过这些模块,我很确定我使用的 Pool.map 是错误的。 - 任何帮助表示赞赏。也欢迎使用其他方法来完成多处理。
from multiprocessing import Pool
def calculation(foo, bar, foobar, baz):
# Do a lot of calculations based on the variables
# Later the result is written to a file.
result = foo * bar * foobar * baz
print(result)
if __name__ == '__main__':
for foo in range(3):
for bar in range(5):
for baz in range(4):
for foobar in range(10):
Pool.map(calculation, foo, bar, foobar, baz)
Pool.close()
Pool.join()
正如您所怀疑的那样,您使用 map
错误的方式不止一种。
map
的要点是对可迭代对象的所有元素调用一个函数。就像内置的map
function, but in parallel. If you want queue a single call, just useapply_async
.对于您具体询问的问题:
map
需要一个 single-argument 函数。如果你想传递多个参数,你可以修改或包装你的函数以采用单个元组而不是多个参数(我将在最后显示),或者只使用starmap
。或者,如果你想使用apply_async
,它接受多个参数的函数,但你传递apply_async
一个参数元组,而不是单独的参数。- 您需要在
Pool
实例上调用map
,而不是Pool
class。您要做的类似于尝试read
从文件类型而不是从特定的打开文件中读取。 - 您正试图在每次迭代后关闭并加入
Pool
。在完成所有这些之前,您不想这样做,否则您的代码将只等待第一个完成,然后为第二个引发异常。
因此,可行的最小更改是:
if __name__ == '__main__':
pool = Pool()
for foo in range(3):
for bar in range(5):
for baz in range(4):
for foobar in range(10):
pool.apply_async(calculation, (foo, bar, foobar, baz))
pool.close()
pool.join()
请注意,我将所有内容都保存在 if __name__ == '__main__':
块中——包括新的 Pool()
构造函数。我不会在后面的示例中展示这一点,但由于文档的 Programming guidelines 部分中解释的原因,这对所有示例都是必需的。1
如果您想使用 map
函数之一,则需要一个充满参数的可迭代对象,如下所示:
pool = Pool()
args = ((foo, bar, foobar, baz)
for foo in range(3)
for bar in range(5)
for baz in range(4)
for foobar in range(10))
pool.starmap(calculation, args)
pool.close()
pool.join()
或者,更简单地说:
pool = Pool()
pool.starmap(calculate, itertools.product(range(3), range(5), range(4), range(10)))
pool.close()
pool.join()
假设您没有使用旧版本的 Python,您可以通过在 with
语句中使用 Pool
来进一步简化它:
with Pool() as pool:
pool.starmap(calculate,
itertools.product(range(3), range(5), range(4), range(10)))
使用 map
或 starmap
的一个问题是它会做额外的工作以确保您按顺序返回结果。但是您只是返回 None
并忽略了它,那么为什么这样做呢?
使用apply_async
没有这个问题。
您也可以将 map
替换为 imap_unordered
,但是没有 istarmap_unordered
,因此您需要包装您的函数以不需要 starmap
:
def starcalculate(args):
return calculate(*args)
with Pool() as pool:
pool.imap_unordered(starcalculate,
itertools.product(range(3), range(5), range(4), range(10)))
1.如果您使用的是 spawn
或 forkserver
启动方法——并且 spawn
是 Windows 的默认值——每个 child 进程都相当于 import
你的模块。因此,所有不受 __main__
守卫保护的 top-level 代码将在每个 child 中获得 运行。该模块试图保护您免受一些最坏的后果(例如,不是用 children 的指数爆炸来创建新的 children,您通常会得到一个异常,而不是用叉子轰炸您的计算机),但它不能使代码实际工作。