多处理——线程池内存泄漏?
Multiprocessing -- Thread Pool Memory Leak?
我正在观察无法向自己解释的内存使用情况。下面我提供了我的实际代码的精简版本,它仍然表现出这种行为。该代码旨在完成以下任务:
以 1000 行为单位读取文本文件。每一行都是一个句子。将这 1000 个句子分成 4 个生成器。将这些生成器传递给线程池,并在 250 个句子上并行提取 运行 特征。
在我的实际代码中,我从整个文件的所有句子中积累特征和标签。
现在奇怪的事情来了:即使没有累积这些值,内存也会被分配但不会再次释放!而且我觉得跟线程池有关系。总共占用的内存量取决于为任何给定单词提取的特征数量。我在这里用 range(100)
来模拟这个。看看:
from sys import argv
from itertools import chain, islice
from multiprocessing import Pool
from math import ceil
# dummyfied feature extraction function
# the lengt of the range determines howmuch mamory is used up in total,
# eventhough the objects are never stored
def features_from_sentence(sentence):
return [{'some feature' 'some value'} for i in range(100)], ['some label' for i in range(100)]
# split iterable into generator of generators of length `size`
def chunks(iterable, size=10):
iterator = iter(iterable)
for first in iterator:
yield chain([first], islice(iterator, size - 1))
def features_from_sentence_meta(l):
return list(map (features_from_sentence, l))
def make_X_and_Y_sets(sentences, i):
print(f'start: {i}')
pool = Pool()
# split sentences into a generator of 4 generators
sentence_chunks = chunks(sentences, ceil(50000/4))
# results is a list containing the lists of pairs of X and Y of all chunks
results = map(lambda x : x[0], pool.map(features_from_sentence_meta, sentence_chunks))
X, Y = zip(*results)
print(f'end: {i}')
return X, Y
# reads file in chunks of `lines_per_chunk` lines
def line_chunks(textfile, lines_per_chunk=1000):
chunk = []
i = 0
with open(textfile, 'r') as textfile:
for line in textfile:
if not line.split(): continue
i+=1
chunk.append(line.strip())
if i == lines_per_chunk:
yield chunk
i = 0
chunk = []
yield chunk
textfile = argv[1]
for i, line_chunk in enumerate(line_chunks(textfile)):
# stop processing file after 10 chunks to demonstrate
# that memory stays occupied (check your system monitor)
if i == 10:
while True:
pass
X_chunk, Y_chunk = make_X_and_Y_sets(line_chunk, i)
我用来调试的文件有 50000 行非空行,这就是我在一个地方使用硬编码 50000 行的原因。如果你想用同一个文件,为了方便他是一个link:
https://www.dropbox.com/s/v7nxb7vrrjim349/de_wiki_50000_lines?dl=0
现在,当你 运行 这个脚本并打开你的系统监视器时,你会观察到内存已用完并且使用量一直持续到第 10 个块,我在此处人为地进入无限循环以证明内存一直在使用,即使我从不存储任何东西。
你能解释一下为什么会这样吗?我似乎遗漏了一些关于应该如何使用多处理池的信息。
首先,让我们澄清一些误解——尽管事实证明,这实际上并不是一开始就探索的正确途径。
当您在 Python 中分配内存时,它当然必须从 OS 中获取内存。
然而,当您释放内存时,它很少返回到 OS,直到您最终退出。相反,它进入 "free list"——或者,实际上,进入不同目的的多级空闲列表。这意味着下次您需要内存时,Python 已经拥有它,并且可以立即找到它,而无需与 OS 联系以分配更多内存。这通常会使内存密集型程序更快。
但这也意味着——尤其是在现代 64 位操作系统上——试图通过查看 Activity Monitor/Task [=82= 来了解您是否真的有任何内存压力问题].几乎没用。
tracemalloc
module in the standard library provides low-level tools to see what actually is going on with your memory usage. At a higher level, you can use something like memory_profiler
,它(如果启用 tracemalloc
支持——这很重要)可以将该信息与来自 psutil
等来源的 OS 级信息放在一起弄清楚事情的进展。
但是,如果您没有看到任何实际问题——您的系统没有进入交换地狱,您没有得到任何 MemoryError
异常,您的性能没有遇到一些奇怪的悬崖它线性扩展到 N,然后在 N+1 时突然变得糟糕,等等——你通常一开始就不需要理会这些。
如果您确实发现了一个问题,那么幸运的是,您已经解决了一半。正如我在顶部提到的,您分配的大部分内存在您最终退出之前不会返回到 OS。但是,如果您所有的内存使用都发生在子进程中,并且这些子进程没有状态,您可以随时让它们退出并重新启动。
当然,这样做会产生性能成本——进程拆卸和启动时间,页面映射和缓存必须重新开始,并要求 OS 再次分配内存,等等。而且还有一个复杂性成本——你不能只 运行 一个池并让它做它的事情;你必须参与它的事情,让它为你回收流程。
multiprocessing.Pool
class 中不支持这样做。
您当然可以构建自己的 Pool
。如果你想花哨一点,你可以查看 multiprocessing
的源代码并按照它的要求去做。或者您可以从 Process
对象列表和一对 Queue
对象中构建一个简单的池。或者你可以直接使用 Process
个对象而不用抽象池。
您可能遇到内存问题的另一个原因是您的各个进程都很好,但它们太多了。
事实上,这里似乎就是这种情况。
您在此函数中创建了一个 Pool
的 4 名工人:
def make_X_and_Y_sets(sentences, i):
print(f'start: {i}')
pool = Pool()
# ...
… 然后你为每个块调用这个函数:
for i, line_chunk in enumerate(line_chunks(textfile)):
# ...
X_chunk, Y_chunk = make_X_and_Y_sets(line_chunk, i)
因此,每个块最终有 4 个新进程。即使每个人的内存使用率都非常低,一次拥有数百个也会加起来。
更不用说让数百个进程竞争 4 个内核可能会严重损害您的时间性能,因此您将时间浪费在上下文切换和 OS 调度上,而不是做真正的工作。
正如您在评论中指出的那样,解决此问题的方法很简单:只需为每个调用创建一个全局 pool
而不是新的。
很抱歉让所有 Columbo 都在这里,但是......还有一件事......这段代码 运行s 在你的模块的顶层:
for i, line_chunk in enumerate(line_chunks(textfile)):
# ...
X_chunk, Y_chunk = make_X_and_Y_sets(line_chunk, i)
… 这就是尝试启动池和所有子任务的代码。但是该池中的每个子进程都需要 import
这个模块,这意味着它们都将以 运行 相同的代码结束,并启动另一个池和一整套额外的子任务.
您可能 运行 在 Linux 或 macOS 上使用此功能,其中默认 startmethod
是 fork
,这意味着 multiprocessing
可以避免这个import
,所以你没有问题。但是如果使用其他启动方法,这段代码基本上就是一个会耗尽所有系统资源的叉子炸弹。其中包括 spawn
,这是 Windows 上的默认启动方法。因此,如果任何人可能 运行 Windows 上的此代码,您应该将所有顶级代码放在 if __name__ == '__main__':
守卫中。
我正在观察无法向自己解释的内存使用情况。下面我提供了我的实际代码的精简版本,它仍然表现出这种行为。该代码旨在完成以下任务:
以 1000 行为单位读取文本文件。每一行都是一个句子。将这 1000 个句子分成 4 个生成器。将这些生成器传递给线程池,并在 250 个句子上并行提取 运行 特征。
在我的实际代码中,我从整个文件的所有句子中积累特征和标签。
现在奇怪的事情来了:即使没有累积这些值,内存也会被分配但不会再次释放!而且我觉得跟线程池有关系。总共占用的内存量取决于为任何给定单词提取的特征数量。我在这里用 range(100)
来模拟这个。看看:
from sys import argv
from itertools import chain, islice
from multiprocessing import Pool
from math import ceil
# dummyfied feature extraction function
# the lengt of the range determines howmuch mamory is used up in total,
# eventhough the objects are never stored
def features_from_sentence(sentence):
return [{'some feature' 'some value'} for i in range(100)], ['some label' for i in range(100)]
# split iterable into generator of generators of length `size`
def chunks(iterable, size=10):
iterator = iter(iterable)
for first in iterator:
yield chain([first], islice(iterator, size - 1))
def features_from_sentence_meta(l):
return list(map (features_from_sentence, l))
def make_X_and_Y_sets(sentences, i):
print(f'start: {i}')
pool = Pool()
# split sentences into a generator of 4 generators
sentence_chunks = chunks(sentences, ceil(50000/4))
# results is a list containing the lists of pairs of X and Y of all chunks
results = map(lambda x : x[0], pool.map(features_from_sentence_meta, sentence_chunks))
X, Y = zip(*results)
print(f'end: {i}')
return X, Y
# reads file in chunks of `lines_per_chunk` lines
def line_chunks(textfile, lines_per_chunk=1000):
chunk = []
i = 0
with open(textfile, 'r') as textfile:
for line in textfile:
if not line.split(): continue
i+=1
chunk.append(line.strip())
if i == lines_per_chunk:
yield chunk
i = 0
chunk = []
yield chunk
textfile = argv[1]
for i, line_chunk in enumerate(line_chunks(textfile)):
# stop processing file after 10 chunks to demonstrate
# that memory stays occupied (check your system monitor)
if i == 10:
while True:
pass
X_chunk, Y_chunk = make_X_and_Y_sets(line_chunk, i)
我用来调试的文件有 50000 行非空行,这就是我在一个地方使用硬编码 50000 行的原因。如果你想用同一个文件,为了方便他是一个link:
https://www.dropbox.com/s/v7nxb7vrrjim349/de_wiki_50000_lines?dl=0
现在,当你 运行 这个脚本并打开你的系统监视器时,你会观察到内存已用完并且使用量一直持续到第 10 个块,我在此处人为地进入无限循环以证明内存一直在使用,即使我从不存储任何东西。
你能解释一下为什么会这样吗?我似乎遗漏了一些关于应该如何使用多处理池的信息。
首先,让我们澄清一些误解——尽管事实证明,这实际上并不是一开始就探索的正确途径。
当您在 Python 中分配内存时,它当然必须从 OS 中获取内存。
然而,当您释放内存时,它很少返回到 OS,直到您最终退出。相反,它进入 "free list"——或者,实际上,进入不同目的的多级空闲列表。这意味着下次您需要内存时,Python 已经拥有它,并且可以立即找到它,而无需与 OS 联系以分配更多内存。这通常会使内存密集型程序更快。
但这也意味着——尤其是在现代 64 位操作系统上——试图通过查看 Activity Monitor/Task [=82= 来了解您是否真的有任何内存压力问题].几乎没用。
tracemalloc
module in the standard library provides low-level tools to see what actually is going on with your memory usage. At a higher level, you can use something like memory_profiler
,它(如果启用 tracemalloc
支持——这很重要)可以将该信息与来自 psutil
等来源的 OS 级信息放在一起弄清楚事情的进展。
但是,如果您没有看到任何实际问题——您的系统没有进入交换地狱,您没有得到任何 MemoryError
异常,您的性能没有遇到一些奇怪的悬崖它线性扩展到 N,然后在 N+1 时突然变得糟糕,等等——你通常一开始就不需要理会这些。
如果您确实发现了一个问题,那么幸运的是,您已经解决了一半。正如我在顶部提到的,您分配的大部分内存在您最终退出之前不会返回到 OS。但是,如果您所有的内存使用都发生在子进程中,并且这些子进程没有状态,您可以随时让它们退出并重新启动。
当然,这样做会产生性能成本——进程拆卸和启动时间,页面映射和缓存必须重新开始,并要求 OS 再次分配内存,等等。而且还有一个复杂性成本——你不能只 运行 一个池并让它做它的事情;你必须参与它的事情,让它为你回收流程。
multiprocessing.Pool
class 中不支持这样做。
您当然可以构建自己的 Pool
。如果你想花哨一点,你可以查看 multiprocessing
的源代码并按照它的要求去做。或者您可以从 Process
对象列表和一对 Queue
对象中构建一个简单的池。或者你可以直接使用 Process
个对象而不用抽象池。
您可能遇到内存问题的另一个原因是您的各个进程都很好,但它们太多了。
事实上,这里似乎就是这种情况。
您在此函数中创建了一个 Pool
的 4 名工人:
def make_X_and_Y_sets(sentences, i):
print(f'start: {i}')
pool = Pool()
# ...
… 然后你为每个块调用这个函数:
for i, line_chunk in enumerate(line_chunks(textfile)):
# ...
X_chunk, Y_chunk = make_X_and_Y_sets(line_chunk, i)
因此,每个块最终有 4 个新进程。即使每个人的内存使用率都非常低,一次拥有数百个也会加起来。
更不用说让数百个进程竞争 4 个内核可能会严重损害您的时间性能,因此您将时间浪费在上下文切换和 OS 调度上,而不是做真正的工作。
正如您在评论中指出的那样,解决此问题的方法很简单:只需为每个调用创建一个全局 pool
而不是新的。
很抱歉让所有 Columbo 都在这里,但是......还有一件事......这段代码 运行s 在你的模块的顶层:
for i, line_chunk in enumerate(line_chunks(textfile)):
# ...
X_chunk, Y_chunk = make_X_and_Y_sets(line_chunk, i)
… 这就是尝试启动池和所有子任务的代码。但是该池中的每个子进程都需要 import
这个模块,这意味着它们都将以 运行 相同的代码结束,并启动另一个池和一整套额外的子任务.
您可能 运行 在 Linux 或 macOS 上使用此功能,其中默认 startmethod
是 fork
,这意味着 multiprocessing
可以避免这个import
,所以你没有问题。但是如果使用其他启动方法,这段代码基本上就是一个会耗尽所有系统资源的叉子炸弹。其中包括 spawn
,这是 Windows 上的默认启动方法。因此,如果任何人可能 运行 Windows 上的此代码,您应该将所有顶级代码放在 if __name__ == '__main__':
守卫中。