Python 中的多处理同步问题,双 for 循环

Sync issues with multiprocessing in Python, double for loop

受教程multiprocessing factorial的启发,我尝试对一个微不足道的力计算模块进行多处理。 我主要担心的是 queue.get 功能没有按预期的顺序检索。例如,它不是给出 [5, 4, 3, 2, 1, 0, -1, -2, -3, -4, -5],而是根据不同的处理器给出混乱的输出。 1)如何根据进程调用顺序追加队列中的结果?我应该使用池、地图、锁或任何类似的东西吗? 2)如何避免内存 sync/overwriting 问题?

def mp_worker(istart, iend, x, out_q1, out_q2):
    global_N = len(x)
    outdict1 = []
    outdict2 = []
    k = 0


       for i in range(istart,iend,1):
            temp_FX = 0
            temp_FY = 0
            for j in range(global_N):
                if i != j:
                    temp_FX = temp_FX + (x[j]-x[i])
                    temp_FY = temp_FY + (x[j]-x[i])
            outdict1.append(temp_FX)
            outdict2.append(temp_FY)
            k = k + 1

    out_q1.put(outdict1)
    out_q2.put(outdict2)

def mp_factorizer( nprocs):

    x = mem.x
    FORCE = mem.FORCE
    N = len(FORCE)

    out_q1 = multiprocessing.Queue()
    out_q2 = multiprocessing.Queue()
    chunksize = int(math.ceil(N / float(nprocs)))
    procs = []

    for i in range(nprocs):
        istart = chunksize * i
        iend = chunksize * (i + 1)
        p = multiprocessing.Process(
                target=mp_worker,
                args=(istart, iend, x, out_q1, out_q2))
        procs.append(p)
        p.start()

    # Collect all results into a single result dict. We know how many dicts
    # with results to expect.
    resultdict1 = []
    resultdict2 = []
    for i in range(nprocs):
        resultdict1 = resultdict1 + out_q1.get()
        resultdict2 = resultdict2 + out_q2.get()

    # Wait for all worker processes to finish
    for p in procs:
        p.join()

    return resultdict1

项目以工作进程恰好完成的顺序添加到队列中。如果你想强制下单,你必须...强制下单;-)

Process 不适合这个。它们的执行本质上是无序的,顺序很可能会从一个 运行 下一个改变。

在这种情况下,可能最简单:首先,完全抛开队列。像这样结束你的 mp_worker()

return outdict1, outdict2

然后使用Pool。有几种方法可以使用一种。最像你已经在做的事情看起来像:

pool = multiprocessing.Pool(nprocs)
for i in range(nprocs):
    istart = chunksize * i
    iend = chunksize * (i + 1)
    p = pool.apply_async(mp_worker, (istart, iend, x))
    procs.append(p)

resultdict1 = []
resultdict2 = []
for p in procs:
    t1, t2 = p.get()
    resultdict1.extend(t1)
    resultdict2.extend(t2)

pool.close()
pool.join()

现在获取结果的顺序与传递任务的顺序相同;订单已被强制执行。

注意:从 + 切换到 .extend() 在逻辑上不是必需的,但可以将二次时间(在循环迭代次数中)操作减少为分摊线性时间操作。这与多处理无关。 somelist = somelist + anotherlist 总是更好地编码为 somelist.extend(anotherlist)

关于OS

这里有一个关于为什么 "it worked" 在 Windows 而不是 Linux 的猜测:从历史上看,进程创建在 Linux 比 Windows 更便宜( Windows 投入更多精力来加速其线程)。这使得当进程执行大约相同数量的工作时,进程更有可能以它们开始的相同顺序在 Windows 结束。但他们肯定 可以 在 Windows 上完成 "out of order" 也是。

无论如何,Python 对此没有任何发言权:如果您需要特定订单,则必须强制执行该订单。