对 Python 中的文件进行多重处理,然后将结果写入磁盘

Multiprocessing a file in Python, then writing the result to disk

我想做以下事情:

我曾尝试将 this and this 个答案拼凑在一起,但收效甚微。 第二个队列的代码永远不会被调用,因此不会写入磁盘。如何让进程知道有第二个队列?

请注意,我不一定是 multiprocessing 的粉丝。如果async/await效果更好,我完全赞成。

到目前为止我的代码

import multiprocessing
import os
import time

in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()

def worker_main(in_queue, out_queue):
    print (os.getpid(), "working")
    while True:
        item = in_queue.get(True)
        print (os.getpid(), "got", item)
        time.sleep(1) #long network processing
        print (os.getpid(), "done", item)
        # put the processed items to be written to disl
        out_queue.put("processed:" + str(item))


pool = multiprocessing.Pool(3, worker_main,(in_queue,out_queue))

for i in range(5): # let's assume this is the file reading part
    in_queue.put(i)

with open('out.txt', 'w') as file:

    while not out_queue.empty():
        try:
            value = q.get(timeout = 1)
            file.write(value + '\n')
        except Exception as qe:
            print ("Empty Queue or dead process")

我在尝试执行您的代码时遇到的第一个问题是:

An attempt has been made to start a new process before the current process has finished 
its bootstrapping phase. This probably means that you are not using fork to start your 
child processes and you have forgotten to use the proper idiom in the main module

我不得不用 if __name__ == '__main__': 惯用语包装任何模块作用域指令。 .

由于您的目标是遍历文件的行,因此 Pool.imap() 似乎很合适。 imap() 文档引用 map() 文档,不同之处在于 imap() 懒惰地从可迭代对象中提取下一个项目(在您的情况下将是 csv 文件),如果你的 csv 文件很大。所以来自 map() 文档:

This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks.

imap() returns 一个迭代器,这样你就可以遍历流程工作者产生的结果来做你必须对他们做的事情(在你的例子中,它是写结果到文件)。

这是一个工作示例:

import multiprocessing
import os
import time


def worker_main(item):
    print(os.getpid(), "got", item)
    time.sleep(1) #long network processing
    print(os.getpid(), "done", item)
    # put the processed items to be written to disl
    return "processed:" + str(item)


if __name__ == '__main__':
    with multiprocessing.Pool(3) as pool:
        with open('out.txt', 'w') as file:
            # range(5) simulating a 5 row csv file.
            for proc_row in pool.imap(worker_main, range(5)):
                file.write(proc_row + '\n')

# printed output:
# 1368 got 0
# 9228 got 1
# 12632 got 2
# 1368 done 0
# 1368 got 3
# 9228 done 1
# 9228 got 4
# 12632 done 2
# 1368 done 3
# 9228 done 4

out.txt 看起来像这样:

processed:0
processed:1
processed:2
processed:3
processed:4

请注意,我也不必使用任何队列。