python 使用多进程过滤海量文件

python using multiprocess to filter a massive file

我正在尝试并行化文件过滤操作,其中每个过滤器都是一个大正则表达式,因此整个过程需要时间 运行。该文件本身约为 100GB。单进程版本如下所示:

def func(line):
    # simple function as an example
    for i in range(10**7):
        pass
    return len(line) % 2 == 0


with open('input.txt') as in_sr, open('output.txt', 'w') as out_sr:
    for line in input:
        if func(line):
            out_sr.write(line)

我尝试使用 multiprocessingimap,但这给出了 ValueError: I/O operation on closed file. 我认为迭代器被复制到每个进程,但并非所有进程都打开了该句柄。

有没有办法使用 multiprocessing 来做到这一点,最好是使用池?

代码类似这样:

def func(line):
    ...

if __name__ == '__main__':

    from multiprocessing import Pool
    from itertools import tee, izip

    pool = Pool(processes=4)

    with open('input.txt') as in_sr, open('output.txt', 'w') as out_sr:
        lines1, lines2 = tee(in_sr)
        for line, flag in izip(lines1, pool.imap(func, lines2)):
            if flag:
                out_sr.write(line)

我可以运行下面的代码没有错误。确保您没有在 with 语句之外调用 in_srout_sr

from multiprocessing import Pool

def func(line):
    # simple function as an example
    for i in xrange(10**7):
        pass
    return len(line) % 2 == 0, line

def main():
    with open('input.txt','r') as in_sr, open('output.txt', 'w') as out_sr:
        pool = Pool(processes=4)
        for ret,line in pool.imap(func, in_sr, chunksize=4):
            if ret:
                out_sr.write(line)
        pool.close()

if __name__ == '__main__':
    main()