Python 进程在 IO 完成之前终止
Python Process Terminate before IO completed
我正在使用 python 多处理库来处理一组进程中的信息。这些流程还包含进一步划分必须完成的工作量的流程。有一个 Manager.Queue 累积所有使用数据的进程的结果。
在python脚本的主线程中。我尝试使用 join 来阻塞主线程,直到我们可以合理地确定是否所有子进程都已完成,然后将输出写入单个文件。然而,在所有数据写入文件之前,系统终止并且文件关闭。
以下代码是对上述解决方案实施的简化提取。
对于 inQueues 中的队列:
queue.join()
for p in processes:
p.join()
print "At the end output has: " + str(out_queue.qsize()) + " records"
with open("results.csv", "w") as out_file:
out_file.write("Algorithm,result\n")
while not out_queue.empty():
res = out_queue.get()
out_file.write(res['algorithm'] + ","+res['result']+"\n")
out_queue.task_done()
time.sleep(0.05)
out_queue.join()
out_file.close()
out_queue.qsize() 将打印超过 500 条可用记录,但只有 100 条记录将打印到文件中。
同样在这一点上,我不能 100% 确定 500 条记录是否是系统生成的总数,而只是此时报告的数量。
如何确保所有结果都写入 results.csv 文件?
不要等到所有进程都完成了再消费数据,而是同时处理数据,记住哪些进程还在运行:
processes = []
"""start processes and append them to processes"""
while True:
try:
# get an item
item = queue.get(True, 0.5)
except Queue.Empty:
# no item received in half a second
if not processes:
# there are no more processes and nothing left to process
break
else:
proc_num = 0
while proc_num < len(processes):
process = processes[proc_num]
exit_code = process.poll()
if exit_code is None:
# process is still running, proceed to next
proc_num += 1
elif exit_code == 0:
# process ended gracefully, remove it from list
processes.pop(proc_num)
else:
# process ended with an error, what now?
raise Exception('Her last words were: "%r"' % exit_code)
else:
# got an item
"""process item"""
不要测试 processes
在 Queue.Empty
情况之外是否为空,否则您将得到 races.
但是 higher level function 你可能会更开心:
pool = multiprocessing.Pool(8)
items = pool.map_async(producer_function, producer_arguments)
for item in items:
"""process item"""
我正在使用 python 多处理库来处理一组进程中的信息。这些流程还包含进一步划分必须完成的工作量的流程。有一个 Manager.Queue 累积所有使用数据的进程的结果。
在python脚本的主线程中。我尝试使用 join 来阻塞主线程,直到我们可以合理地确定是否所有子进程都已完成,然后将输出写入单个文件。然而,在所有数据写入文件之前,系统终止并且文件关闭。
以下代码是对上述解决方案实施的简化提取。 对于 inQueues 中的队列: queue.join()
for p in processes:
p.join()
print "At the end output has: " + str(out_queue.qsize()) + " records"
with open("results.csv", "w") as out_file:
out_file.write("Algorithm,result\n")
while not out_queue.empty():
res = out_queue.get()
out_file.write(res['algorithm'] + ","+res['result']+"\n")
out_queue.task_done()
time.sleep(0.05)
out_queue.join()
out_file.close()
out_queue.qsize() 将打印超过 500 条可用记录,但只有 100 条记录将打印到文件中。 同样在这一点上,我不能 100% 确定 500 条记录是否是系统生成的总数,而只是此时报告的数量。
如何确保所有结果都写入 results.csv 文件?
不要等到所有进程都完成了再消费数据,而是同时处理数据,记住哪些进程还在运行:
processes = []
"""start processes and append them to processes"""
while True:
try:
# get an item
item = queue.get(True, 0.5)
except Queue.Empty:
# no item received in half a second
if not processes:
# there are no more processes and nothing left to process
break
else:
proc_num = 0
while proc_num < len(processes):
process = processes[proc_num]
exit_code = process.poll()
if exit_code is None:
# process is still running, proceed to next
proc_num += 1
elif exit_code == 0:
# process ended gracefully, remove it from list
processes.pop(proc_num)
else:
# process ended with an error, what now?
raise Exception('Her last words were: "%r"' % exit_code)
else:
# got an item
"""process item"""
不要测试 processes
在 Queue.Empty
情况之外是否为空,否则您将得到 races.
但是 higher level function 你可能会更开心:
pool = multiprocessing.Pool(8)
items = pool.map_async(producer_function, producer_arguments)
for item in items:
"""process item"""