多处理:如何 mp.map 将元素存储在列表中的函数?
Multiprocessing: How to mp.map a function storing elements in a list?
我有一个类似于下面的程序:
import time
from multiprocessing import Pool
class a_system():
def __init__(self,N):
self.N = N
self.L = [0 for _ in range(self.N)]
def comp(self,n):
self.L[n] = 1
return self.L
def reset(self):
self.L = [0 for _ in range(self.N)]
def individual_sim(iter):
global B, L, sys
sys.reset()
L[iter] = sys.comp(iter)
B += sum(L[iter])
time.sleep(1)
return L, B
def simulate(N_mc):
global B, L, sys
L = [[] for _ in range(N_mc)]
B = 0
sys = a_system(N_mc)
[*map(individual_sim, range(N_mc))]
# with Pool() as P:
# P.map(individual_sim,range(N_mc))
return L, B
if __name__=="__main__":
start = time.time()
L, B = simulate(N_mc=5)
print(L)
print(B)
print("Time elapsed: ",time.time()-start)
在这里,我想将 [*map(individual_sim, range(N_mc))]
行与多处理并行化。但是,将此行替换为
with Pool() as P:
P.map(individual_sim,range(N_mc))
returns 列表的空列表。
如果我改为使用 P.map_async
、P.imap
或 P.imap_unordered
,我不会收到错误,但列表和 B
留空。
如何并行化此代码?
P.S。
我已经从 multiprocessing.pool
尝试了 ThreadPool
,但我想避免这种情况,因为 class a_system
比这里显示的要复杂一些,需要每个工人都有不同的副本(我得到一个exit code 139 (interrupted by signal 11: SIGSEGV)
)。
P.S.2
我可能会尝试使用 sharedctypes 或 Managers (?),但我不确定它们是如何工作的,也不确定我应该使用哪一个(或组合使用?)。
P.S.3
我也试过将 individual_sim
修改为
def individual_sim(iter,B,L,sys):
sys.reset()
L[iter] = sys.comp(iter)
B += sum(L[iter])
time.sleep(1)
return L, B
并在 simulation
中使用以下内容:
from functools import partial
part_individual_sim = partial(individual_sim, B=B, L=L, sys=sys)
with Pool() as P:
P.map(part_individual_sim,range(N_mc))
但我仍然得到空列表。
multiprocessing
模块通过 fork
主进程(或执行 Python 解释器的更多副本,尤其是在 Windows 下)工作。
因此,您会看到全局变量,但它们不会在进程之间共享 --- 除非您采取特殊措施,例如显式共享内存。您最好将所需的状态作为函数参数(或通过 Pool
的 initializer
和 initargs
传递)并通过 return 值传回结果。
这往往会稍微限制您的设计选择,尤其是当您需要传递大量状态时(例如,作为您想要拟合的数据)
它是一个非常轻量级的包装器,围绕着非常低级的原语,因此它不像 Dask
这样的东西那么有特色,但如果你能忍受这些限制,性能往往会更好
编辑以包含一些演示代码,这些代码假设您问题中的 N_mc
变量与您进行一些 Monte Carlo/randomised 近似有关。我首先引入一些库:
from multiprocessing import Pool
from PIL import Image
import numpy as np
并定义一个辅助函数及其初始化代码:
def initfn(path):
# make sure worker processes don't share RNG state, see:
# https://github.com/numpy/numpy/issues/9650
np.random.seed()
global image
with Image.open(path) as img:
image = np.asarray(img.convert('L'))
def worker(i, nsamps):
height, width = image.shape
subset = image[
np.random.randint(height, size=nsamps),
np.random.randint(width, size=nsamps),
]
return np.mean(subset)
def mc_mean(path, nsamples, niter):
with Pool(initializer=initfn, initargs=(path,)) as pool:
params = [(i, nsamples) for i in range(niter)]
return pool.starmap(worker, params)
即initfn
将 JPEG/PNG 文件读入一个 numpy 数组,然后 worker
只计算一些随机像素子集的平均值(即亮度)。请注意,彩色图像作为 3d 矩阵加载,由 [row, col, channel]
索引(通道通常为 0=红色,1=蓝色,2=绿色)。此外,我们还显式调用 np.random.seed
以确保我们的 worker 作业不会获得相同的随机值序列。
然后我们可以 运行 并绘制输出以确保一切正常:
import scipy.stats as sps
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style='ticks')
filename = 'an_image.jpeg'
result = mc_mean(filename, 1234, 56789)
# Histogram of our results
plt.hist(result, 201, density=True, alpha=0.5, edgecolor='none')
# also calculate/display expected distribution
with Image.open(filename) as img:
arr = np.asarray(img.convert('L'))
# approximate distribution of monte-carlo error
mcdist = sps.norm(np.mean(arr), np.std(arr) / np.sqrt(1234))
mn,mx = plt.xlim()
plt.xlim(mn, mx)
x = np.linspace(mn, mx, 201)
plt.plot(x, mcdist.pdf(x), '--', color='C1')
sns.despine()
这应该给我们这样的东西:
显然这将取决于所使用的图像,这是来自 this JPEG。
我不太清楚这里的业务逻辑是什么,但是您不能从子进程中修改父进程中的全局变量。单独的进程不共享它们的地址 space。
不过,您可以将 L
设为 Manager.List
并将 B
设为 Manager.Value
,以便从您的工作进程中修改它们。管理器对象存在于单独的服务器进程中,您可以使用代理对象修改它们。此外,您需要在修改这些共享对象时使用 Manager.Lock
以防止数据损坏。
这是一个精简的示例,可以帮助您入门:
import time
from multiprocessing import Pool, Manager
def individual_sim(mlist, mvalue, mlock, idx):
# in your real computation, make sure to not hold the lock longer than
# really needed (e.g. calculations without holding lock)
with mlock:
mlist[idx] += 10
mvalue.value += sum(mlist)
def simulate(n_workers, n):
with Manager() as m:
mlist = m.list([i for i in range(n)])
print(mlist)
mvalue = m.Value('i', 0)
mlock = m.Lock()
iterable = [(mlist, mvalue, mlock, i) for i in range(n)]
with Pool(processes=n_workers) as pool:
pool.starmap(individual_sim, iterable)
# convert to non-shared objects before terminating manager
mlist = list(mlist)
mvalue = mvalue.value
return mlist, mvalue
if __name__=="__main__":
N_WORKERS = 4
N = 20
start = time.perf_counter()
L, B = simulate(N_WORKERS, N)
print(L)
print(B)
print("Time elapsed: ",time.perf_counter() - start)
示例输出:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
5900
Time elapsed: 0.14064819699706277
Process finished with exit code 0
也可以使用 Pool 的 initializer
参数在 worker 初始化时传递代理并将它们注册为全局变量,而不是将它们作为 starmap 调用的常规参数发送。
关于 Manager
用法的更多信息(相关:嵌套代理)我写了 。
我有一个类似于下面的程序:
import time
from multiprocessing import Pool
class a_system():
def __init__(self,N):
self.N = N
self.L = [0 for _ in range(self.N)]
def comp(self,n):
self.L[n] = 1
return self.L
def reset(self):
self.L = [0 for _ in range(self.N)]
def individual_sim(iter):
global B, L, sys
sys.reset()
L[iter] = sys.comp(iter)
B += sum(L[iter])
time.sleep(1)
return L, B
def simulate(N_mc):
global B, L, sys
L = [[] for _ in range(N_mc)]
B = 0
sys = a_system(N_mc)
[*map(individual_sim, range(N_mc))]
# with Pool() as P:
# P.map(individual_sim,range(N_mc))
return L, B
if __name__=="__main__":
start = time.time()
L, B = simulate(N_mc=5)
print(L)
print(B)
print("Time elapsed: ",time.time()-start)
在这里,我想将 [*map(individual_sim, range(N_mc))]
行与多处理并行化。但是,将此行替换为
with Pool() as P:
P.map(individual_sim,range(N_mc))
returns 列表的空列表。
如果我改为使用 P.map_async
、P.imap
或 P.imap_unordered
,我不会收到错误,但列表和 B
留空。
如何并行化此代码?
P.S。
我已经从 multiprocessing.pool
尝试了 ThreadPool
,但我想避免这种情况,因为 class a_system
比这里显示的要复杂一些,需要每个工人都有不同的副本(我得到一个exit code 139 (interrupted by signal 11: SIGSEGV)
)。
P.S.2 我可能会尝试使用 sharedctypes 或 Managers (?),但我不确定它们是如何工作的,也不确定我应该使用哪一个(或组合使用?)。
P.S.3
我也试过将 individual_sim
修改为
def individual_sim(iter,B,L,sys):
sys.reset()
L[iter] = sys.comp(iter)
B += sum(L[iter])
time.sleep(1)
return L, B
并在 simulation
中使用以下内容:
from functools import partial
part_individual_sim = partial(individual_sim, B=B, L=L, sys=sys)
with Pool() as P:
P.map(part_individual_sim,range(N_mc))
但我仍然得到空列表。
multiprocessing
模块通过 fork
主进程(或执行 Python 解释器的更多副本,尤其是在 Windows 下)工作。
因此,您会看到全局变量,但它们不会在进程之间共享 --- 除非您采取特殊措施,例如显式共享内存。您最好将所需的状态作为函数参数(或通过 Pool
的 initializer
和 initargs
传递)并通过 return 值传回结果。
这往往会稍微限制您的设计选择,尤其是当您需要传递大量状态时(例如,作为您想要拟合的数据)
它是一个非常轻量级的包装器,围绕着非常低级的原语,因此它不像 Dask
这样的东西那么有特色,但如果你能忍受这些限制,性能往往会更好
编辑以包含一些演示代码,这些代码假设您问题中的 N_mc
变量与您进行一些 Monte Carlo/randomised 近似有关。我首先引入一些库:
from multiprocessing import Pool
from PIL import Image
import numpy as np
并定义一个辅助函数及其初始化代码:
def initfn(path):
# make sure worker processes don't share RNG state, see:
# https://github.com/numpy/numpy/issues/9650
np.random.seed()
global image
with Image.open(path) as img:
image = np.asarray(img.convert('L'))
def worker(i, nsamps):
height, width = image.shape
subset = image[
np.random.randint(height, size=nsamps),
np.random.randint(width, size=nsamps),
]
return np.mean(subset)
def mc_mean(path, nsamples, niter):
with Pool(initializer=initfn, initargs=(path,)) as pool:
params = [(i, nsamples) for i in range(niter)]
return pool.starmap(worker, params)
即initfn
将 JPEG/PNG 文件读入一个 numpy 数组,然后 worker
只计算一些随机像素子集的平均值(即亮度)。请注意,彩色图像作为 3d 矩阵加载,由 [row, col, channel]
索引(通道通常为 0=红色,1=蓝色,2=绿色)。此外,我们还显式调用 np.random.seed
以确保我们的 worker 作业不会获得相同的随机值序列。
然后我们可以 运行 并绘制输出以确保一切正常:
import scipy.stats as sps
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style='ticks')
filename = 'an_image.jpeg'
result = mc_mean(filename, 1234, 56789)
# Histogram of our results
plt.hist(result, 201, density=True, alpha=0.5, edgecolor='none')
# also calculate/display expected distribution
with Image.open(filename) as img:
arr = np.asarray(img.convert('L'))
# approximate distribution of monte-carlo error
mcdist = sps.norm(np.mean(arr), np.std(arr) / np.sqrt(1234))
mn,mx = plt.xlim()
plt.xlim(mn, mx)
x = np.linspace(mn, mx, 201)
plt.plot(x, mcdist.pdf(x), '--', color='C1')
sns.despine()
这应该给我们这样的东西:
显然这将取决于所使用的图像,这是来自 this JPEG。
我不太清楚这里的业务逻辑是什么,但是您不能从子进程中修改父进程中的全局变量。单独的进程不共享它们的地址 space。
不过,您可以将 L
设为 Manager.List
并将 B
设为 Manager.Value
,以便从您的工作进程中修改它们。管理器对象存在于单独的服务器进程中,您可以使用代理对象修改它们。此外,您需要在修改这些共享对象时使用 Manager.Lock
以防止数据损坏。
这是一个精简的示例,可以帮助您入门:
import time
from multiprocessing import Pool, Manager
def individual_sim(mlist, mvalue, mlock, idx):
# in your real computation, make sure to not hold the lock longer than
# really needed (e.g. calculations without holding lock)
with mlock:
mlist[idx] += 10
mvalue.value += sum(mlist)
def simulate(n_workers, n):
with Manager() as m:
mlist = m.list([i for i in range(n)])
print(mlist)
mvalue = m.Value('i', 0)
mlock = m.Lock()
iterable = [(mlist, mvalue, mlock, i) for i in range(n)]
with Pool(processes=n_workers) as pool:
pool.starmap(individual_sim, iterable)
# convert to non-shared objects before terminating manager
mlist = list(mlist)
mvalue = mvalue.value
return mlist, mvalue
if __name__=="__main__":
N_WORKERS = 4
N = 20
start = time.perf_counter()
L, B = simulate(N_WORKERS, N)
print(L)
print(B)
print("Time elapsed: ",time.perf_counter() - start)
示例输出:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
5900
Time elapsed: 0.14064819699706277
Process finished with exit code 0
也可以使用 Pool 的 initializer
参数在 worker 初始化时传递代理并将它们注册为全局变量,而不是将它们作为 starmap 调用的常规参数发送。
关于 Manager
用法的更多信息(相关:嵌套代理)我写了