Python 多处理池映射和 imap
Python multiprocessing Pool map and imap
我有一个 multiprocessing
脚本,其中 pool.map
有效。问题是并非所有进程都需要那么长的时间才能完成,因此某些进程会进入休眠状态,因为它们会等待所有进程完成(与 中的问题相同)。有些文件在不到一秒内完成,有些则需要几分钟(或几小时)。
如果我对手册 (and this post) 的理解正确,pool.imap
不会等待所有进程完成,如果一个进程完成,它会提供一个新文件进行处理。当我尝试这样做时,脚本正在加速处理文件,小文件按预期处理,大文件(需要更多时间处理)直到最后才完成(在没有通知的情况下被杀死?)。这是 pool.imap
的正常行为,还是我需要添加更多 commands/parameters?当我在 else
部分添加 time.sleep(100)
作为测试时,它正在处理更多大文件,但其他进程却睡着了。有什么建议么 ?谢谢
def process_file(infile):
#read infile
#compare things in infile
#acquire Lock, save things in outfile, release Lock
#delete infile
def main():
#nprocesses = 8
global filename
pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
for d in pathlist:
os.chdir(d)
todolist = []
for infile in os.listdir():
todolist.append(infile)
try:
p = Pool(processes=nprocesses)
p.imap(process_file, todolist)
except KeyboardInterrupt:
print("Shutting processes down")
# Optionally try to gracefully shut down the worker processes here.
p.close()
p.terminate()
p.join()
except StopIteration:
continue
else:
time.sleep(100)
os.chdir('..')
p.close()
p.join()
if __name__ == '__main__':
main()
由于您已经将所有文件放入列表中,因此可以将它们直接放入队列中。然后队列与您的子进程共享,子进程从队列中获取文件名并执行它们的工作。无需执行两次(首先进入列表,然后 Pool.imap 腌制列表)。 Pool.imap 做的完全一样,只是你不知道。
todolist = []
for infile in os.listdir():
todolist.append(infile)
可以替换为:
todolist = Queue()
for infile in os.listdir():
todolist.put(infile)
完整的解决方案如下所示:
def process_file(inqueue):
for infile in iter(inqueue.get, "STOP"):
#do stuff until inqueue.get returns "STOP"
#read infile
#compare things in infile
#acquire Lock, save things in outfile, release Lock
#delete infile
def main():
nprocesses = 8
global filename
pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
for d in pathlist:
os.chdir(d)
todolist = Queue()
for infile in os.listdir():
todolist.put(infile)
process = [Process(target=process_file,
args=(todolist) for x in range(nprocesses)]
for p in process:
#task the processes to stop when all files are handled
#"STOP" is at the very end of queue
todolist.put("STOP")
for p in process:
p.start()
for p in process:
p.join()
if __name__ == '__main__':
main()
我有一个 multiprocessing
脚本,其中 pool.map
有效。问题是并非所有进程都需要那么长的时间才能完成,因此某些进程会进入休眠状态,因为它们会等待所有进程完成(与
如果我对手册 (and this post) 的理解正确,pool.imap
不会等待所有进程完成,如果一个进程完成,它会提供一个新文件进行处理。当我尝试这样做时,脚本正在加速处理文件,小文件按预期处理,大文件(需要更多时间处理)直到最后才完成(在没有通知的情况下被杀死?)。这是 pool.imap
的正常行为,还是我需要添加更多 commands/parameters?当我在 else
部分添加 time.sleep(100)
作为测试时,它正在处理更多大文件,但其他进程却睡着了。有什么建议么 ?谢谢
def process_file(infile):
#read infile
#compare things in infile
#acquire Lock, save things in outfile, release Lock
#delete infile
def main():
#nprocesses = 8
global filename
pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
for d in pathlist:
os.chdir(d)
todolist = []
for infile in os.listdir():
todolist.append(infile)
try:
p = Pool(processes=nprocesses)
p.imap(process_file, todolist)
except KeyboardInterrupt:
print("Shutting processes down")
# Optionally try to gracefully shut down the worker processes here.
p.close()
p.terminate()
p.join()
except StopIteration:
continue
else:
time.sleep(100)
os.chdir('..')
p.close()
p.join()
if __name__ == '__main__':
main()
由于您已经将所有文件放入列表中,因此可以将它们直接放入队列中。然后队列与您的子进程共享,子进程从队列中获取文件名并执行它们的工作。无需执行两次(首先进入列表,然后 Pool.imap 腌制列表)。 Pool.imap 做的完全一样,只是你不知道。
todolist = []
for infile in os.listdir():
todolist.append(infile)
可以替换为:
todolist = Queue()
for infile in os.listdir():
todolist.put(infile)
完整的解决方案如下所示:
def process_file(inqueue):
for infile in iter(inqueue.get, "STOP"):
#do stuff until inqueue.get returns "STOP"
#read infile
#compare things in infile
#acquire Lock, save things in outfile, release Lock
#delete infile
def main():
nprocesses = 8
global filename
pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
for d in pathlist:
os.chdir(d)
todolist = Queue()
for infile in os.listdir():
todolist.put(infile)
process = [Process(target=process_file,
args=(todolist) for x in range(nprocesses)]
for p in process:
#task the processes to stop when all files are handled
#"STOP" is at the very end of queue
todolist.put("STOP")
for p in process:
p.start()
for p in process:
p.join()
if __name__ == '__main__':
main()