多处理:如何 mp.map 将元素存储在列表中的函数?

Multiprocessing: How to mp.map a function storing elements in a list?

我有一个类似于下面的程序:

import time
from multiprocessing import Pool

class a_system():
    def __init__(self,N):
        self.N = N
        self.L = [0 for _ in range(self.N)]
    def comp(self,n):
        self.L[n] = 1
        return self.L
    def reset(self):
        self.L = [0 for _ in range(self.N)]

def individual_sim(iter):
    global B, L, sys
    sys.reset()
    L[iter] = sys.comp(iter)
    B += sum(L[iter])
    time.sleep(1)
    return L, B

def simulate(N_mc):
    global B, L, sys
    L = [[] for _ in range(N_mc)]
    B = 0
    sys = a_system(N_mc)
    [*map(individual_sim, range(N_mc))]
    # with Pool() as P:
    #     P.map(individual_sim,range(N_mc))
    return L, B

if __name__=="__main__":
    start = time.time()
    L, B = simulate(N_mc=5)
    print(L)
    print(B)
    print("Time elapsed: ",time.time()-start)

在这里,我想将 [*map(individual_sim, range(N_mc))] 行与多处理并行化。但是,将此行替换为

with Pool() as P:
     P.map(individual_sim,range(N_mc))

returns 列表的空列表。

如果我改为使用 P.map_asyncP.imapP.imap_unordered,我不会收到错误,但列表和 B 留空。

如何并行化此代码?

P.S。 我已经从 multiprocessing.pool 尝试了 ThreadPool,但我想避免这种情况,因为 class a_system 比这里显示的要复杂一些,需要每个工人都有不同的副本(我得到一个exit code 139 (interrupted by signal 11: SIGSEGV))。

P.S.2 我可能会尝试使用 sharedctypes 或 Managers (?),但我不确定它们是如何工作的,也不确定我应该使用哪一个(或组合使用?)。

P.S.3 我也试过将 individual_sim 修改为

def individual_sim(iter,B,L,sys):
    sys.reset()
    L[iter] = sys.comp(iter)
    B += sum(L[iter])
    time.sleep(1)
    return L, B

并在 simulation 中使用以下内容:

   from functools import partial
   part_individual_sim = partial(individual_sim, B=B, L=L, sys=sys)
   with Pool() as P:
        P.map(part_individual_sim,range(N_mc))

但我仍然得到空列表。

multiprocessing 模块通过 fork 主进程(或执行 Python 解释器的更多副本,尤其是在 Windows 下)工作。

因此,您会看到全局变量,但它们不会在进程之间共享 --- 除非您采取特殊措施,例如显式共享内存。您最好将所需的状态作为函数参数(或通过 Poolinitializerinitargs 传递)并通过 return 值传回结果。

这往往会稍微限制您的设计选择,尤其是当您需要传递大量状态时(例如,作为您想要拟合的数据)

它是一个非常轻量级的包装器,围绕着非常低级的原语,因此它不像 Dask 这样的东西那么有特色,但如果你能忍受这些限制,性能往往会更好

编辑以包含一些演示代码,这些代码假设您问题中的 N_mc 变量与您进行一些 Monte Carlo/randomised 近似有关。我首先引入一些库:

from multiprocessing import Pool

from PIL import Image
import numpy as np

并定义一个辅助函数及其初始化代码:

def initfn(path):
    # make sure worker processes don't share RNG state, see:
    #   https://github.com/numpy/numpy/issues/9650
    np.random.seed()

    global image
    with Image.open(path) as img:
        image = np.asarray(img.convert('L'))

def worker(i, nsamps):
    height, width = image.shape
    subset = image[
        np.random.randint(height, size=nsamps),
        np.random.randint(width, size=nsamps),
    ]
    return np.mean(subset)

def mc_mean(path, nsamples, niter):
    with Pool(initializer=initfn, initargs=(path,)) as pool:
        params = [(i, nsamples) for i in range(niter)]
        return pool.starmap(worker, params)

initfn 将 JPEG/PNG 文件读入一个 numpy 数组,然后 worker 只计算一些随机像素子集的平均值(即亮度)。请注意,彩色图像作为 3d 矩阵加载,由 [row, col, channel] 索引(通道通常为 0=红色,1=蓝色,2=绿色)。此外,我们还显式调用 np.random.seed 以确保我们的 worker 作业不会获得相同的随机值序列。

然后我们可以 运行 并绘制输出以确保一切正常:

import scipy.stats as sps
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style='ticks')

filename = 'an_image.jpeg'
result = mc_mean(filename, 1234, 56789)

# Histogram of our results
plt.hist(result, 201, density=True, alpha=0.5, edgecolor='none')

# also calculate/display expected distribution
with Image.open(filename) as img:
    arr = np.asarray(img.convert('L'))
    # approximate distribution of monte-carlo error 
    mcdist = sps.norm(np.mean(arr), np.std(arr) / np.sqrt(1234))

mn,mx = plt.xlim()
plt.xlim(mn, mx)

x = np.linspace(mn, mx, 201)
plt.plot(x, mcdist.pdf(x), '--', color='C1')
sns.despine()

这应该给我们这样的东西:

显然这将取决于所使用的图像,这是来自 this JPEG

我不太清楚这里的业务逻辑是什么,但是您不能从子进程中修改父进程中的全局变量。单独的进程不共享它们的地址 space。

不过,您可以将 L 设为 Manager.List 并将 B 设为 Manager.Value,以便从您的工作进程中修改它们。管理器对象存在于单独的服务器进程中,您可以使用代理对象修改它们。此外,您需要在修改这些共享对象时使用 Manager.Lock 以防止数据损坏。

这是一个精简的示例,可以帮助您入门:

import time
from multiprocessing import Pool, Manager


def individual_sim(mlist, mvalue, mlock, idx):
    # in your real computation, make sure to not hold the lock longer than
    # really needed (e.g. calculations without holding lock)
    with mlock:
        mlist[idx] += 10
        mvalue.value += sum(mlist)


def simulate(n_workers, n):

    with Manager() as m:
        mlist = m.list([i for i in range(n)])
        print(mlist)
        mvalue = m.Value('i', 0)
        mlock = m.Lock()

        iterable = [(mlist, mvalue, mlock, i) for i in range(n)]

        with Pool(processes=n_workers) as pool:
             pool.starmap(individual_sim, iterable)

        # convert to non-shared objects before terminating manager
        mlist = list(mlist)
        mvalue = mvalue.value

    return mlist, mvalue


if __name__=="__main__":

    N_WORKERS = 4
    N = 20

    start = time.perf_counter()
    L, B = simulate(N_WORKERS, N)
    print(L)
    print(B)
    print("Time elapsed: ",time.perf_counter() - start)

示例输出:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
5900
Time elapsed:  0.14064819699706277

Process finished with exit code 0

也可以使用 Pool 的 initializer 参数在 worker 初始化时传递代理并将它们注册为全局变量,而不是将它们作为 starmap 调用的常规参数发送。

关于 Manager 用法的更多信息(相关:嵌套代理)我写了