Python 进程在 IO 完成之前终止

Python Process Terminate before IO completed

我正在使用 python 多处理库来处理一组进程中的信息。这些流程还包含进一步划分必须完成的工作量的流程。有一个 Manager.Queue 累积所有使用数据的进程的结果。

在python脚本的主线程中。我尝试使用 join 来阻塞主线程,直到我们可以合理地确定是否所有子进程都已完成,然后将输出写入单个文件。然而,在所有数据写入文件之前,系统终止并且文件关闭。

以下代码是对上述解决方案实施的简化提取。 对于 inQueues 中的队列: queue.join()

for p in processes:
    p.join() 
print "At the end output has: " + str(out_queue.qsize()) + " records" 

with open("results.csv", "w") as out_file:
    out_file.write("Algorithm,result\n")
    while not out_queue.empty():
        res = out_queue.get()
        out_file.write(res['algorithm'] + ","+res['result']+"\n")
        out_queue.task_done()
        time.sleep(0.05)
    out_queue.join()
    out_file.close()

out_queue.qsize() 将打印超过 500 条可用记录,但只有 100 条记录将打印到文件中。 同样在这一点上,我不能 100% 确定 500 条记录是否是系统生成的总数,而只是此时报告的数量。

如何确保所有结果都写入 results.csv 文件?

不要等到所有进程都完成了再消费数据,而是同时处理数据,记住哪些进程还在运行:

processes = []

"""start processes and append them to processes"""

while True:
    try:
        # get an item
        item = queue.get(True, 0.5)
    except Queue.Empty:
        # no item received in half a second
        if not processes:
            # there are no more processes and nothing left to process
            break
        else:
            proc_num = 0
            while proc_num < len(processes):
                process = processes[proc_num]
                exit_code = process.poll()
                if exit_code is None:
                    # process is still running, proceed to next
                    proc_num += 1
                elif exit_code == 0:
                    # process ended gracefully, remove it from list
                    processes.pop(proc_num)
                else:
                    # process ended with an error, what now?
                    raise Exception('Her last words were: "%r"' % exit_code)
    else:
        # got an item
        """process item"""

不要测试 processesQueue.Empty 情况之外是否为空,否则您将得到 races.

但是 higher level function 你可能会更开心:

pool = multiprocessing.Pool(8)
items = pool.map_async(producer_function, producer_arguments)
for item in items:
    """process item"""