使用 pool.map() 时防止字典副本
Preventing copies of dictionary when using pool.map()
我有一个函数 f(x)
我想并行计算值列表 xrange
。该函数执行如下操作:
def f(x, wrange, dict1, dict2):
out_list = []
v1 = dict1[x]
for w in wrange:
v2 = dict2[x-w]
out_list += [np.dot(v1, v2)]
return out_list
它从字典 dict1
中获取一个矩阵值,从字典 dict2
中获取一个向量,然后将它们相乘。现在我并行执行此操作的正常方法是这样的:
import functools
import multiprocessing
par_func = functools.partial(f, wrange=wrange, dict1=dict1, dict2=dict2)
p = multiprocessing.Pool(4)
ssdat = p.map(par_func, wrange)
p.close()
p.join()
现在当 dict1
和 dict2
是大词典时,这会导致代码失败并显示错误
File "/anaconda3/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
我认为这是因为 pool
正在为我的函数的每次评估制作 dict1
和 dict2
的副本。有没有一种有效的方法可以将这些字典设置为共享内存对象? map
是执行此操作的最佳函数吗?
如果您想在使用多进程的进程之间共享内存,您需要明确地与 multiprocessing.Array 共享对象。这并不理想,因为您想要访问字典中的元素并且找到正确的数据可能很耗时。如果它确实成为您的问题,可能有解决方法。
如@Peque 所述,另一种选择是使用 threading. With threading, memory is automatically shared across all processes but you can run into performance issues due to the global interpreter lock (GIL)。 GIL 是 Python 确保线程安全并避免竞争条件的方法。
如果您使用的是基于 fork
的系统(阅读:不是 Windows),解决此问题的一种方法是将 dict
放在全局变量中,编写一个不将它们作为参数的函数,而只是从它自己的全局变量中访问它们,然后使用它。 ,但您的用例可以轻松替换为全局变量和 def
-ed 函数:
import multiprocessing
# Assumes wrange/dict1/dict2 defined or imported somewhere at global scope,
# prior to creating the Pool
def par_func(x):
return f(x, wrange, dict1, dict2)
# Using with statement implicitly terminates the pool, saving close/join calls
# and guaranteeing an exception while mapping doesn't leave the pool alive indefinitely
with multiprocessing.Pool(4) as p:
ssdat = p.map(par_func, wrange)
在创建 Pool
后,对 dict1
/dict2
的更改不会在进程之间反映出来,但您似乎无论如何都以只读方式使用它,所以这不是问题。
如果你在 Windows,或者需要改变 dict
,你可以随时 make a multiprocessing.Manager
and make dict
proxies with the dict
method of the manager(这些是共享的 dict
,更新关键赋值),但它更丑陋,更慢,所以如果可能的话,我会劝阻它。
我有一个函数 f(x)
我想并行计算值列表 xrange
。该函数执行如下操作:
def f(x, wrange, dict1, dict2):
out_list = []
v1 = dict1[x]
for w in wrange:
v2 = dict2[x-w]
out_list += [np.dot(v1, v2)]
return out_list
它从字典 dict1
中获取一个矩阵值,从字典 dict2
中获取一个向量,然后将它们相乘。现在我并行执行此操作的正常方法是这样的:
import functools
import multiprocessing
par_func = functools.partial(f, wrange=wrange, dict1=dict1, dict2=dict2)
p = multiprocessing.Pool(4)
ssdat = p.map(par_func, wrange)
p.close()
p.join()
现在当 dict1
和 dict2
是大词典时,这会导致代码失败并显示错误
File "/anaconda3/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
我认为这是因为 pool
正在为我的函数的每次评估制作 dict1
和 dict2
的副本。有没有一种有效的方法可以将这些字典设置为共享内存对象? map
是执行此操作的最佳函数吗?
如果您想在使用多进程的进程之间共享内存,您需要明确地与 multiprocessing.Array 共享对象。这并不理想,因为您想要访问字典中的元素并且找到正确的数据可能很耗时。如果它确实成为您的问题,可能有解决方法。
如@Peque 所述,另一种选择是使用 threading. With threading, memory is automatically shared across all processes but you can run into performance issues due to the global interpreter lock (GIL)。 GIL 是 Python 确保线程安全并避免竞争条件的方法。
如果您使用的是基于 fork
的系统(阅读:不是 Windows),解决此问题的一种方法是将 dict
放在全局变量中,编写一个不将它们作为参数的函数,而只是从它自己的全局变量中访问它们,然后使用它。 def
-ed 函数:
import multiprocessing
# Assumes wrange/dict1/dict2 defined or imported somewhere at global scope,
# prior to creating the Pool
def par_func(x):
return f(x, wrange, dict1, dict2)
# Using with statement implicitly terminates the pool, saving close/join calls
# and guaranteeing an exception while mapping doesn't leave the pool alive indefinitely
with multiprocessing.Pool(4) as p:
ssdat = p.map(par_func, wrange)
在创建 Pool
后,对 dict1
/dict2
的更改不会在进程之间反映出来,但您似乎无论如何都以只读方式使用它,所以这不是问题。
如果你在 Windows,或者需要改变 dict
,你可以随时 make a multiprocessing.Manager
and make dict
proxies with the dict
method of the manager(这些是共享的 dict
,更新关键赋值),但它更丑陋,更慢,所以如果可能的话,我会劝阻它。