Python 读取文件的生成器上的多处理
Python multiprocessing on a generator that reads files in
我正在尝试读取和处理 1000 多个文件,但不幸的是,处理文件的时间大约是从磁盘读取文件的时间的 3 倍,所以我想在读取这些文件时进行处理在(并且当我继续阅读其他文件时)。
在一个完美的世界中,我有一个一次读取一个文件的生成器,我想将这个生成器传递给一组工作人员,这些工作人员在(缓慢地)生成项目时处理来自生成器的项目。
这是一个例子:
def process_file(file_string):
...
return processed_file
pool = Pool(processes=4)
path = 'some/path/'
results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))
上面代码唯一的问题是在pool开始之前所有的文件都读入了内存,这意味着我需要等待磁盘读入所有的东西,而且我也消耗了大量的内存.
Pool.map
和 Pool.map_async
list
ify the iterable
passed to them, so your generator will always be realized fully before processing even begins.
各种 Pool.imap*
函数 似乎 将输入作为生成器处理,因此您可以更改:
results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))
至:
# If you can process outputs one at a time, drop the list wrapper
# If you can process outputs without order mattering, imap_unordered will
# get you the best results
results = list(pool.imap(process_file, (open(path+part,'rb').read() for part in os.listdir(path))))
并在处理前得到相同的结果,但 AFAICT,他们仍然会尝试尽可能快地完全填充队列,这可能会导致大量数据未处理和内存使用过多;除此之外,您将在一个进程中读取所有数据,然后通过 IPC 发送所有数据,这意味着您仍然在 I/O.
上遇到瓶颈
在你的位置,我会将读取移动到任务本身(如果可以的话,避免读取整个文件,改为按行或按块处理一次阅读整个内容)。你会得到并行读取,更少的 IPC,而且你不会冒险在前几个文件被处理之前吞噬所有文件;您打开的文件永远不会超过您的工作人员。所以最终结果看起来像:
def process_file(path):
with open(path, 'rb') as f:
file_string = f.read()
... same as before ...
return processed_file
pool = Pool(processes=4)
path = 'some/path/'
results = pool.imap(process_file, (os.path.join(path, part) for part in os.listdir(path)))
您正在将文件读取到 parent 的内存中,然后将有效负载传输到 children。那是相当低效的。只发送文件名,让 children 执行 I/O。如果结果是一堆您打算写入文件的文本,也请在 child 中执行此操作。
map
通常会一次性发出大块工作,以减少与其池工作人员的通信开销。这可能就是您获得大内存峰值的原因。仅传递文件名可以解决该问题,但当工作人员之间的处理时间不均时,设置较小的块大小仍然有益。
def process_file(filename):
with open(filename, 'rb') as fp:
file_string = fp.read()
...
return processed_file
pool = Pool(processes=4)
path = 'some/path/'
results = pool.map(process_file, path+part for part in os.listdir(path)), chunksize=1)
我正在尝试读取和处理 1000 多个文件,但不幸的是,处理文件的时间大约是从磁盘读取文件的时间的 3 倍,所以我想在读取这些文件时进行处理在(并且当我继续阅读其他文件时)。
在一个完美的世界中,我有一个一次读取一个文件的生成器,我想将这个生成器传递给一组工作人员,这些工作人员在(缓慢地)生成项目时处理来自生成器的项目。
这是一个例子:
def process_file(file_string):
...
return processed_file
pool = Pool(processes=4)
path = 'some/path/'
results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))
上面代码唯一的问题是在pool开始之前所有的文件都读入了内存,这意味着我需要等待磁盘读入所有的东西,而且我也消耗了大量的内存.
Pool.map
和 Pool.map_async
list
ify the iterable
passed to them, so your generator will always be realized fully before processing even begins.
各种 Pool.imap*
函数 似乎 将输入作为生成器处理,因此您可以更改:
results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))
至:
# If you can process outputs one at a time, drop the list wrapper
# If you can process outputs without order mattering, imap_unordered will
# get you the best results
results = list(pool.imap(process_file, (open(path+part,'rb').read() for part in os.listdir(path))))
并在处理前得到相同的结果,但 AFAICT,他们仍然会尝试尽可能快地完全填充队列,这可能会导致大量数据未处理和内存使用过多;除此之外,您将在一个进程中读取所有数据,然后通过 IPC 发送所有数据,这意味着您仍然在 I/O.
上遇到瓶颈在你的位置,我会将读取移动到任务本身(如果可以的话,避免读取整个文件,改为按行或按块处理一次阅读整个内容)。你会得到并行读取,更少的 IPC,而且你不会冒险在前几个文件被处理之前吞噬所有文件;您打开的文件永远不会超过您的工作人员。所以最终结果看起来像:
def process_file(path):
with open(path, 'rb') as f:
file_string = f.read()
... same as before ...
return processed_file
pool = Pool(processes=4)
path = 'some/path/'
results = pool.imap(process_file, (os.path.join(path, part) for part in os.listdir(path)))
您正在将文件读取到 parent 的内存中,然后将有效负载传输到 children。那是相当低效的。只发送文件名,让 children 执行 I/O。如果结果是一堆您打算写入文件的文本,也请在 child 中执行此操作。
map
通常会一次性发出大块工作,以减少与其池工作人员的通信开销。这可能就是您获得大内存峰值的原因。仅传递文件名可以解决该问题,但当工作人员之间的处理时间不均时,设置较小的块大小仍然有益。
def process_file(filename):
with open(filename, 'rb') as fp:
file_string = fp.read()
...
return processed_file
pool = Pool(processes=4)
path = 'some/path/'
results = pool.map(process_file, path+part for part in os.listdir(path)), chunksize=1)