在 for 循环中 Python 中进行多处理并传递多个参数

Mutliprocessing in Python in a for loop and passing multiple Arguments

我正在使用 python 脚本进行大量计算。因为它是 CPU-bound,所以我通常使用线程模块的方法没有产生任何性能改进。

我现在尝试使用多处理而不是多线程来更好地使用我的 CPU 并加快冗长的计算。

我在 Whosebug 上找到了一些示例代码,但我没有让脚本接受多个参数。有人可以帮我解决这个问题吗?我以前从未使用过这些模块,我很确定我使用的 Pool.map 是错误的。 - 任何帮助表示赞赏。也欢迎使用其他方法来完成多处理。

from multiprocessing import Pool

def calculation(foo, bar, foobar, baz):
    # Do a lot of calculations based on the variables
    # Later the result is written to a file.
    result = foo * bar * foobar * baz
    print(result)

if __name__ == '__main__':
    for foo in range(3):
        for bar in range(5):
            for baz in range(4):
                for foobar in range(10):

                    Pool.map(calculation, foo, bar, foobar, baz)
                    Pool.close()
                    Pool.join()

正如您所怀疑的那样,您使用 map 错误的方式不止一种。

  • map的要点是对可迭代对象的所有元素调用一个函数。就像内置的 map function, but in parallel. If you want queue a single call, just use apply_async.

  • 对于您具体询问的问题:map 需要一个 single-argument 函数。如果你想传递多个参数,你可以修改或包装你的函数以采用单个元组而不是多个参数(我将在最后显示),或者只使用 starmap。或者,如果你想使用 apply_async,它接受多个参数的函数,但你传递 apply_async 一个参数元组,而不是单独的参数。

  • 您需要在 Pool 实例上调用 map,而不是 Pool class。您要做的类似于尝试 read 从文件类型而不是从特定的打开文件中读取。
  • 您正试图在每次迭代后关闭并加入 Pool。在完成所有这些之前,您不想这样做,否则您的代码将只等待第一个完成,然后为第二个引发异常。

因此,可行的最小更改是:

if __name__ == '__main__':
    pool = Pool()
    for foo in range(3):
        for bar in range(5):
            for baz in range(4):
                for foobar in range(10):
                    pool.apply_async(calculation, (foo, bar, foobar, baz))
    pool.close()
    pool.join()

请注意,我将所有内容都保存在 if __name__ == '__main__': 块中——包括新的 Pool() 构造函数。我不会在后面的示例中展示这一点,但由于文档的 Programming guidelines 部分中解释的原因,这对所有示例都是必需的。1


如果您想使用 map 函数之一,则需要一个充满参数的可迭代对象,如下所示:

pool = Pool()
args = ((foo, bar, foobar, baz) 
        for foo in range(3) 
        for bar in range(5) 
        for baz in range(4) 
        for foobar in range(10))
pool.starmap(calculation, args)
pool.close()
pool.join()

或者,更简单地说:

pool = Pool()
pool.starmap(calculate, itertools.product(range(3), range(5), range(4), range(10)))
pool.close()
pool.join()

假设您没有使用旧版本的 Python,您可以通过在 with 语句中使用 Pool 来进一步简化它:

with Pool() as pool:
    pool.starmap(calculate, 
                 itertools.product(range(3), range(5), range(4), range(10)))

使用 mapstarmap 的一个问题是它会做额外的工作以确保您按顺序返回结果。但是您只是返回 None 并忽略了它,那么为什么这样做呢?

使用apply_async没有这个问题。

您也可以将 map 替换为 imap_unordered,但是没有 istarmap_unordered,因此您需要包装您的函数以不需要 starmap:

def starcalculate(args):
    return calculate(*args)

with Pool() as pool:
    pool.imap_unordered(starcalculate,
                        itertools.product(range(3), range(5), range(4), range(10)))

1.如果您使用的是 spawnforkserver 启动方法——并且 spawn 是 Windows 的默认值——每个 child 进程都相当于 import你的模块。因此,所有不受 __main__ 守卫保护的 top-level 代码将在每个 child 中获得 运行。该模块试图保护您免受一些最坏的后果(例如,不是用 children 的指数爆炸来创建新的 children,您通常会得到一个异常,而不是用叉子轰炸您的计算机),但它不能使代码实际工作。