python 使用多进程过滤海量文件
python using multiprocess to filter a massive file
我正在尝试并行化文件过滤操作,其中每个过滤器都是一个大正则表达式,因此整个过程需要时间 运行。该文件本身约为 100GB。单进程版本如下所示:
def func(line):
# simple function as an example
for i in range(10**7):
pass
return len(line) % 2 == 0
with open('input.txt') as in_sr, open('output.txt', 'w') as out_sr:
for line in input:
if func(line):
out_sr.write(line)
我尝试使用 multiprocessing
的 imap
,但这给出了 ValueError: I/O operation on closed file.
我认为迭代器被复制到每个进程,但并非所有进程都打开了该句柄。
有没有办法使用 multiprocessing
来做到这一点,最好是使用池?
代码类似这样:
def func(line):
...
if __name__ == '__main__':
from multiprocessing import Pool
from itertools import tee, izip
pool = Pool(processes=4)
with open('input.txt') as in_sr, open('output.txt', 'w') as out_sr:
lines1, lines2 = tee(in_sr)
for line, flag in izip(lines1, pool.imap(func, lines2)):
if flag:
out_sr.write(line)
我可以运行下面的代码没有错误。确保您没有在 with
语句之外调用 in_sr
和 out_sr
。
from multiprocessing import Pool
def func(line):
# simple function as an example
for i in xrange(10**7):
pass
return len(line) % 2 == 0, line
def main():
with open('input.txt','r') as in_sr, open('output.txt', 'w') as out_sr:
pool = Pool(processes=4)
for ret,line in pool.imap(func, in_sr, chunksize=4):
if ret:
out_sr.write(line)
pool.close()
if __name__ == '__main__':
main()
我正在尝试并行化文件过滤操作,其中每个过滤器都是一个大正则表达式,因此整个过程需要时间 运行。该文件本身约为 100GB。单进程版本如下所示:
def func(line):
# simple function as an example
for i in range(10**7):
pass
return len(line) % 2 == 0
with open('input.txt') as in_sr, open('output.txt', 'w') as out_sr:
for line in input:
if func(line):
out_sr.write(line)
我尝试使用 multiprocessing
的 imap
,但这给出了 ValueError: I/O operation on closed file.
我认为迭代器被复制到每个进程,但并非所有进程都打开了该句柄。
有没有办法使用 multiprocessing
来做到这一点,最好是使用池?
代码类似这样:
def func(line):
...
if __name__ == '__main__':
from multiprocessing import Pool
from itertools import tee, izip
pool = Pool(processes=4)
with open('input.txt') as in_sr, open('output.txt', 'w') as out_sr:
lines1, lines2 = tee(in_sr)
for line, flag in izip(lines1, pool.imap(func, lines2)):
if flag:
out_sr.write(line)
我可以运行下面的代码没有错误。确保您没有在 with
语句之外调用 in_sr
和 out_sr
。
from multiprocessing import Pool
def func(line):
# simple function as an example
for i in xrange(10**7):
pass
return len(line) % 2 == 0, line
def main():
with open('input.txt','r') as in_sr, open('output.txt', 'w') as out_sr:
pool = Pool(processes=4)
for ret,line in pool.imap(func, in_sr, chunksize=4):
if ret:
out_sr.write(line)
pool.close()
if __name__ == '__main__':
main()