使用 pool.map() 时防止字典副本

Preventing copies of dictionary when using pool.map()

我有一个函数 f(x) 我想并行计算值列表 xrange。该函数执行如下操作:

def f(x, wrange, dict1, dict2):

    out_list = []

    v1 = dict1[x]

    for w in wrange:
        v2 = dict2[x-w]
        out_list += [np.dot(v1, v2)]

    return out_list

它从字典 dict1 中获取一个矩阵值,从字典 dict2 中获取一个向量,然后将它们相乘。现在我并行执行此操作的正常方法是这样的:

import functools
import multiprocessing

par_func = functools.partial(f, wrange=wrange, dict1=dict1, dict2=dict2)

p = multiprocessing.Pool(4)
ssdat = p.map(par_func, wrange)
p.close()
p.join()

现在当 dict1dict2 是大词典时,这会导致代码失败并显示错误

File "/anaconda3/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647

我认为这是因为 pool 正在为我的函数的每次评估制作 dict1dict2 的副本。有没有一种有效的方法可以将这些字典设置为共享内存对象? map 是执行此操作的最佳函数吗?

如果您想在使用多进程的进程之间共享内存,您需要明确地与 multiprocessing.Array 共享对象。这并不理想,因为您想要访问字典中的元素并且找到正确的数据可能很耗时。如果它确实成为您的问题,可能有解决方法。

如@Peque 所述,另一种选择是使用 threading. With threading, memory is automatically shared across all processes but you can run into performance issues due to the global interpreter lock (GIL)。 GIL 是 Python 确保线程安全并避免竞争条件的方法。

如果您使用的是基于 fork 的系统(阅读:不是 Windows),解决此问题的一种方法是将 dict 放在全局变量中,编写一个不将它们作为参数的函数,而只是从它自己的全局变量中访问它们,然后使用它。 ,但您的用例可以轻松替换为全局变量和 def-ed 函数:

import multiprocessing

# Assumes wrange/dict1/dict2 defined or imported somewhere at global scope,
# prior to creating the Pool
def par_func(x):
    return f(x, wrange, dict1, dict2)

# Using with statement implicitly terminates the pool, saving close/join calls
# and guaranteeing an exception while mapping doesn't leave the pool alive indefinitely
with multiprocessing.Pool(4) as p:
    ssdat = p.map(par_func, wrange)

在创建 Pool 后,对 dict1/dict2 的更改不会在进程之间反映出来,但您似乎无论如何都以只读方式使用它,所以这不是问题。

如果你在 Windows,或者需要改变 dict,你可以随时 make a multiprocessing.Manager and make dict proxies with the dict method of the manager(这些是共享的 dict,更新关键赋值),但它更丑陋,更慢,所以如果可能的话,我会劝阻它。