在循环中等待 pool.apply_async

Wait for pool.apply_async inside a loop

我第一次尝试在我的 python 代码中实现多处理。我被卡住了,因为我无法让 async_apply 等待其所有进程完成。我想以更小的块处理元素并在我浏览一长串元素时保存结果。

举个简单的例子:

import multiprocessing as mp

def fun(x, y):
    print("here")
    return(x+y)

buffer = []

for val in range(10):
    buffer.append(val)
    print(f{Added value: {val})
    if len(buffer) == 5:
        #It is my understanding, this is necessary on Windows
        if __name__ == "__main__":
            pool = mp.Pool()
            res = [pool.apply_async(fun, args = (x,x)) for x in buffer]
            res = [r.wait() for r in res]
            print(f'Results: {res}')
            buffer = []
            pool.close()
            pool.join()

我希望它能产生以下输出:

Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Here
Here
Here
Here
Here
Results: [0, 2, 4, 6, 8]
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [10, 12, 14, 16, 18]

但是它实际上产生了这个(至少在我的机器上):

Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [None, None, None, None, None]
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [None, None, None, None, None]

非常感谢任何建议。

尝试将整个 for 循环 放入 条件组中。

...
if __name__ == '__main__':

    for val in range(10):
        buffer.append(val)
        print(f'Added value: {val}')
        if len(buffer) == 5:
            pool = mp.Pool()
            res = [pool.apply_async(fun, args = (x,x)) for x in buffer]
            # wait til they are ALL done ?
            for r in res:
                r.wait()
            # get the return values
            res = [r.get() for r in res]
            print(f'Results: {res}')
            buffer = []
            pool.close()
            pool.join()

这是经过一些额外检查的原件。我仍然不知道为什么,但似乎 for 循环中的行在多个 python 进程中以某种方式 运行。

import multiprocessing as mp
import pickle

def fun(x, y, pid=None):
    print(f"here pid:{pid}",file=sys.stderr)
    return (x+y,pid)

buffer = []
stuff = []

with open(r'c:\pyProjects\stuff.pkl','wb') as f:
    pickle.dump(stuff,f)

for val in range(10):
    buffer.append(val)
    pid = os.getpid()
    print(f'Added value: {val}.   pid={pid}')
    d = {'val':val,'pid':pid}
    with open(r'c:\pyProjects\stuff.pkl','rb') as f:
        try:
            stuff = pickle.load(f)
            stuff.append(d)
        except EOFError as e:
            s = '\n'.join(f'\t\t\t\t{item}' for item in stuff)
            print(f'\t\t\tEOFError {d}\n\t\t\tstuff:\n{s}\n')
    with open(r'c:\pyProjects\stuff.pkl','wb') as f:
        pickle.dump(stuff,f)
    if len(buffer) == 5:
        print(buffer)
        #It is my understanding, this is necessary on Windows
        if __name__ == "__main__":
            pool = mp.Pool()
            res = [pool.apply_async(fun, args = (x,x,pid)) for x in buffer]
            res = [r.get() for r in res]
            print(f'\t\t\tResults: {res}')
            buffer = []
            pool.close()
            pool.join()

完成后,您可以使用

加载和细读腌制文件
>>> import pickle
>>> from pprint import pprint
>>> with open(r'c:\pyProjects\stuff.pkl','rb') as f:
...     a = pickle.load(f)

>>> a.sort(key=lambda x: x['pid'])
>>> pprint(a)