Python3:多进程消耗大量 RAM 并减慢速度
Python3: Multiprocessing consumes extensively much RAM and slows down
我启动了多个进程以创建新对象列表。 htop
向我展示了 1 到 4 个进程(我总是创建 3 个新对象)。
def foo(self):
with multiprocessing.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, self.information)
self.new_objs = result.get()
pool.terminate()
gc.collect()
我调用了很多次foo()
,每次调用,整个过程都运行慢,最后程序都没有执行完,慢了很多。该程序开始耗尽我所有的 RAM,而顺序方法没有任何显着的 RAM 使用。
当我终止程序时,大多数时候这是程序上次执行的功能。
->File "threading.py", line 293, in wait
waiter.acquire()
编辑
提供一些关于我的情况的信息。我创建了一棵由节点组成的树。 foo()
由父节点调用以创建其子节点。进程返回的result
就是这些子节点。这些保存在父节点的列表中。我想并行创建这些子节点,而不是按顺序创建它们。
我认为您的问题主要与以下事实有关:您的并行函数是对象的 方法 。没有更多信息很难确定,但请考虑这个小玩具程序:
import multiprocessing as mp
import numpy as np
import gc
class Object(object):
def __init__(self, _):
self.data = np.empty((100, 100, 100), dtype=np.float64)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, range(50))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def new_obj(self, i):
return Object(i)
def __del__(self):
print("Dead")
if __name__ == '__main__':
c = Container()
for j in range(5):
c.foo()
现在 Container
只被调用一次,所以您会看到一个 "Born"
,然后是一个 "Dead"
被打印出来;但是由于进程正在执行的代码是容器的方法,这意味着整个容器必须在别处执行! 运行 这个,你会看到 "Born"
和 "Dead"
的混合流,因为你的容器在每次 执行 映射时重建:
Born
Born
Born
Born
Born
Dead
Born
Dead
Dead
Born
Dead
Born
...
<MANY MORE LINES HERE>
...
Born
Dead
为了让自己相信整个容器每次都被复制和发送,尝试设置一些不可序列化的值:
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, range(50))
self.fn = lambda x: x**2
self.objects.extend(result.get())
pool.terminate()
gc.collect()
这将立即引发 AttributeError
,因为它无法序列化容器。
总结一下:当向池发送 1000 个请求时,Container
将被序列化,发送到进程并在那里反序列化 1000 次。当然,它们最终会被删除(假设没有发生太多奇怪的交叉引用),但这肯定会给 RAM 带来很大压力,因为对象被序列化、调用、更新、重新序列化……对于每个映射输入中的元素。
你怎么解决这个问题?好吧,理想情况下,不要共享状态:
def new_obj(_):
return Object(_)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(new_obj, range(50))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def __del__(self):
print("Dead")
这只需要一小部分时间就可以完成,并且只会在 RAM 上产生最小的飞艇(因为曾经建造过一个 Container
)。如果您需要将一些内部状态传递到那里,请将其提取并发送:
def new_obj(tup):
very_important_state, parameters = tup
return Object(very_important_state=very_important_state,
parameters=parameters)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
important_state = len(self.objects)
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(new_obj,
((important_state, i) for i in range(50)))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def __del__(self):
print("Dead")
这与以前的行为相同。如果您 绝对 无法避免在进程之间共享一些可变状态,请查看 the multiprocessing tools 这样做而不必每次都复制所有内容。
我启动了多个进程以创建新对象列表。 htop
向我展示了 1 到 4 个进程(我总是创建 3 个新对象)。
def foo(self):
with multiprocessing.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, self.information)
self.new_objs = result.get()
pool.terminate()
gc.collect()
我调用了很多次foo()
,每次调用,整个过程都运行慢,最后程序都没有执行完,慢了很多。该程序开始耗尽我所有的 RAM,而顺序方法没有任何显着的 RAM 使用。
当我终止程序时,大多数时候这是程序上次执行的功能。
->File "threading.py", line 293, in wait
waiter.acquire()
编辑
提供一些关于我的情况的信息。我创建了一棵由节点组成的树。 foo()
由父节点调用以创建其子节点。进程返回的result
就是这些子节点。这些保存在父节点的列表中。我想并行创建这些子节点,而不是按顺序创建它们。
我认为您的问题主要与以下事实有关:您的并行函数是对象的 方法 。没有更多信息很难确定,但请考虑这个小玩具程序:
import multiprocessing as mp
import numpy as np
import gc
class Object(object):
def __init__(self, _):
self.data = np.empty((100, 100, 100), dtype=np.float64)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, range(50))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def new_obj(self, i):
return Object(i)
def __del__(self):
print("Dead")
if __name__ == '__main__':
c = Container()
for j in range(5):
c.foo()
现在 Container
只被调用一次,所以您会看到一个 "Born"
,然后是一个 "Dead"
被打印出来;但是由于进程正在执行的代码是容器的方法,这意味着整个容器必须在别处执行! 运行 这个,你会看到 "Born"
和 "Dead"
的混合流,因为你的容器在每次 执行 映射时重建:
Born
Born
Born
Born
Born
Dead
Born
Dead
Dead
Born
Dead
Born
...
<MANY MORE LINES HERE>
...
Born
Dead
为了让自己相信整个容器每次都被复制和发送,尝试设置一些不可序列化的值:
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, range(50))
self.fn = lambda x: x**2
self.objects.extend(result.get())
pool.terminate()
gc.collect()
这将立即引发 AttributeError
,因为它无法序列化容器。
总结一下:当向池发送 1000 个请求时,Container
将被序列化,发送到进程并在那里反序列化 1000 次。当然,它们最终会被删除(假设没有发生太多奇怪的交叉引用),但这肯定会给 RAM 带来很大压力,因为对象被序列化、调用、更新、重新序列化……对于每个映射输入中的元素。
你怎么解决这个问题?好吧,理想情况下,不要共享状态:
def new_obj(_):
return Object(_)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(new_obj, range(50))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def __del__(self):
print("Dead")
这只需要一小部分时间就可以完成,并且只会在 RAM 上产生最小的飞艇(因为曾经建造过一个 Container
)。如果您需要将一些内部状态传递到那里,请将其提取并发送:
def new_obj(tup):
very_important_state, parameters = tup
return Object(very_important_state=very_important_state,
parameters=parameters)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
important_state = len(self.objects)
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(new_obj,
((important_state, i) for i in range(50)))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def __del__(self):
print("Dead")
这与以前的行为相同。如果您 绝对 无法避免在进程之间共享一些可变状态,请查看 the multiprocessing tools 这样做而不必每次都复制所有内容。