在循环中等待 pool.apply_async
Wait for pool.apply_async inside a loop
我第一次尝试在我的 python 代码中实现多处理。我被卡住了,因为我无法让 async_apply 等待其所有进程完成。我想以更小的块处理元素并在我浏览一长串元素时保存结果。
举个简单的例子:
import multiprocessing as mp
def fun(x, y):
print("here")
return(x+y)
buffer = []
for val in range(10):
buffer.append(val)
print(f{Added value: {val})
if len(buffer) == 5:
#It is my understanding, this is necessary on Windows
if __name__ == "__main__":
pool = mp.Pool()
res = [pool.apply_async(fun, args = (x,x)) for x in buffer]
res = [r.wait() for r in res]
print(f'Results: {res}')
buffer = []
pool.close()
pool.join()
我希望它能产生以下输出:
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Here
Here
Here
Here
Here
Results: [0, 2, 4, 6, 8]
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [10, 12, 14, 16, 18]
但是它实际上产生了这个(至少在我的机器上):
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [None, None, None, None, None]
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [None, None, None, None, None]
非常感谢任何建议。
尝试将整个 for 循环 放入 条件组中。
...
if __name__ == '__main__':
for val in range(10):
buffer.append(val)
print(f'Added value: {val}')
if len(buffer) == 5:
pool = mp.Pool()
res = [pool.apply_async(fun, args = (x,x)) for x in buffer]
# wait til they are ALL done ?
for r in res:
r.wait()
# get the return values
res = [r.get() for r in res]
print(f'Results: {res}')
buffer = []
pool.close()
pool.join()
这是经过一些额外检查的原件。我仍然不知道为什么,但似乎 for 循环中的行在多个 python 进程中以某种方式 运行。
import multiprocessing as mp
import pickle
def fun(x, y, pid=None):
print(f"here pid:{pid}",file=sys.stderr)
return (x+y,pid)
buffer = []
stuff = []
with open(r'c:\pyProjects\stuff.pkl','wb') as f:
pickle.dump(stuff,f)
for val in range(10):
buffer.append(val)
pid = os.getpid()
print(f'Added value: {val}. pid={pid}')
d = {'val':val,'pid':pid}
with open(r'c:\pyProjects\stuff.pkl','rb') as f:
try:
stuff = pickle.load(f)
stuff.append(d)
except EOFError as e:
s = '\n'.join(f'\t\t\t\t{item}' for item in stuff)
print(f'\t\t\tEOFError {d}\n\t\t\tstuff:\n{s}\n')
with open(r'c:\pyProjects\stuff.pkl','wb') as f:
pickle.dump(stuff,f)
if len(buffer) == 5:
print(buffer)
#It is my understanding, this is necessary on Windows
if __name__ == "__main__":
pool = mp.Pool()
res = [pool.apply_async(fun, args = (x,x,pid)) for x in buffer]
res = [r.get() for r in res]
print(f'\t\t\tResults: {res}')
buffer = []
pool.close()
pool.join()
完成后,您可以使用
加载和细读腌制文件
>>> import pickle
>>> from pprint import pprint
>>> with open(r'c:\pyProjects\stuff.pkl','rb') as f:
... a = pickle.load(f)
>>> a.sort(key=lambda x: x['pid'])
>>> pprint(a)
我第一次尝试在我的 python 代码中实现多处理。我被卡住了,因为我无法让 async_apply 等待其所有进程完成。我想以更小的块处理元素并在我浏览一长串元素时保存结果。
举个简单的例子:
import multiprocessing as mp
def fun(x, y):
print("here")
return(x+y)
buffer = []
for val in range(10):
buffer.append(val)
print(f{Added value: {val})
if len(buffer) == 5:
#It is my understanding, this is necessary on Windows
if __name__ == "__main__":
pool = mp.Pool()
res = [pool.apply_async(fun, args = (x,x)) for x in buffer]
res = [r.wait() for r in res]
print(f'Results: {res}')
buffer = []
pool.close()
pool.join()
我希望它能产生以下输出:
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Here
Here
Here
Here
Here
Results: [0, 2, 4, 6, 8]
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [10, 12, 14, 16, 18]
但是它实际上产生了这个(至少在我的机器上):
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [None, None, None, None, None]
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [None, None, None, None, None]
非常感谢任何建议。
尝试将整个 for 循环 放入 条件组中。
...
if __name__ == '__main__':
for val in range(10):
buffer.append(val)
print(f'Added value: {val}')
if len(buffer) == 5:
pool = mp.Pool()
res = [pool.apply_async(fun, args = (x,x)) for x in buffer]
# wait til they are ALL done ?
for r in res:
r.wait()
# get the return values
res = [r.get() for r in res]
print(f'Results: {res}')
buffer = []
pool.close()
pool.join()
这是经过一些额外检查的原件。我仍然不知道为什么,但似乎 for 循环中的行在多个 python 进程中以某种方式 运行。
import multiprocessing as mp
import pickle
def fun(x, y, pid=None):
print(f"here pid:{pid}",file=sys.stderr)
return (x+y,pid)
buffer = []
stuff = []
with open(r'c:\pyProjects\stuff.pkl','wb') as f:
pickle.dump(stuff,f)
for val in range(10):
buffer.append(val)
pid = os.getpid()
print(f'Added value: {val}. pid={pid}')
d = {'val':val,'pid':pid}
with open(r'c:\pyProjects\stuff.pkl','rb') as f:
try:
stuff = pickle.load(f)
stuff.append(d)
except EOFError as e:
s = '\n'.join(f'\t\t\t\t{item}' for item in stuff)
print(f'\t\t\tEOFError {d}\n\t\t\tstuff:\n{s}\n')
with open(r'c:\pyProjects\stuff.pkl','wb') as f:
pickle.dump(stuff,f)
if len(buffer) == 5:
print(buffer)
#It is my understanding, this is necessary on Windows
if __name__ == "__main__":
pool = mp.Pool()
res = [pool.apply_async(fun, args = (x,x,pid)) for x in buffer]
res = [r.get() for r in res]
print(f'\t\t\tResults: {res}')
buffer = []
pool.close()
pool.join()
完成后,您可以使用
加载和细读腌制文件>>> import pickle
>>> from pprint import pprint
>>> with open(r'c:\pyProjects\stuff.pkl','rb') as f:
... a = pickle.load(f)
>>> a.sort(key=lambda x: x['pid'])
>>> pprint(a)