Python 使用进程的多处理:消耗大量内存
Python Multiprocessing using Process: Consuming Large Memory
我运行从单个 python 代码中宁多个进程:
代码片段:
while 1:
if sqsObject.msgCount() > 0:
ReadyMsg = sqsObject.readM2Q()
if ReadyMsg == 0:
continue
fileName = ReadyMsg['fileName']
dirName = ReadyMsg['dirName']
uuid = ReadyMsg['uid']
guid = ReadyMsg['guid']
callback = ReadyMsg['callbackurl']
# print ("Trigger Algorithm Process")
if(countProcess < maxProcess):
try:
retValue = Process(target=dosomething, args=(dirName, uuid,guid,callback))
processArray.append(retValue)
retValue.start()
countProcess = countProcess + 1
except:
print "Cannot Run Process"
else:
for i in range(len(processArray)):
if (processArray[i].is_alive() == True):
continue
else:
try:
#print 'Restart Process'
processArray[i] = Process(target=dosomething, args=(dirName,uuid,guid,callback))
processArray[i].start()
except:
print "Cannot Run Process"
else: # No more request to service
for i in range(len(processArray)):
if (processArray[i].is_alive() == True):
processRunning = 1
break
else:
continue
if processRunning == 0:
countProcess = 0
else:
processRunning = 0
在这里,我正在从队列中读取消息并创建一个进程以 运行 该消息的算法。我设置了 maxProcess 的上限。因此,在达到 maxProcess 后,我想通过检查 is_alive().
来重用不存在的 processArray 插槽
此进程 运行 适合较少数量的进程,但是,对于大量消息(例如 100 条),内存消耗会达到顶峰。我在想我通过重用进程槽发生泄漏。
不确定这个过程中有什么问题。
提前感谢您发现错误或提出明智的建议。
Not sure what is wrong in the process.
看来您正在创建与消息一样多的进程,即使已达到 maxProcess 计数。
I am thinking I have leak by reusing the process slots.
无需自己管理流程。只需使用 process pool:
# before your while loop starts
from multiprocessing import Pool
pool = Pool(processes=max_process)
while 1:
...
# instead of creating a new Process
res = pool.apply_async(dosomething,
args=(dirName,uuid,guid,callback))
# after the while loop has finished
# -- wait to finish
pool.close()
pool.join()
提交工作的方式
请注意,Pool
class 支持多种提交作业的方式:
- apply_async - 一次一条消息
- map_async - 一次一大块消息
如果消息到达速度足够快,最好收集其中的几个(比如一次收集 10 条或 100 条,具体取决于完成的实际处理)并使用 map
提交 "mini-batch"一次到目标函数:
...
while True:
messages = []
# build mini-batch of messages
while len(messages) < batch_size:
... # get message
messages.append((dirName,uuid,guid,callback))
pool.map_async(dosomething, messages)
为了避免 dosomething
留下的内存泄漏,您可以要求池在消耗了一定数量的消息后重新启动进程:
max_tasks = 5 # some sensible number
Pool(max_processes, maxtasksperchild=max_tasks)
分发
如果使用这种方法仍然超出内存容量,请考虑使用分布式方法,即添加更多机器。使用 Celery 这将是非常直接的,来自上面:
# tasks.py
@task
def dosomething(...):
... # same code as before
# driver.py
while True:
... # get messages as before
res = somefunc.apply_async(args=(dirName,uuid,guid,callback))
总之,您的代码很奇怪:-)
它不是an mvce,所以没有人可以测试它,但是看看它,你在内部循环中有这个(稍微简化的)结构:
if count < limit:
... start a new process, and increment count ...
else:
do things that can potentially start even more processes
(but never, ever, decrease count)
这似乎充其量是不明智的。
在任何地方都没有流程实例的 join()
调用。 (我们稍后会回到外循环及其 else
的情况。)
让我们更仔细地看一下内部循环的 else
案例代码:
for i in range(len(processArray)):
if (processArray[i].is_alive() == True):
抛开不必要的 == True
测试——这有点冒险,因为 is_alive()
方法没有明确承诺 return True
和 False
,只是布尔值起作用的东西——考虑 this description from the documentation(这个 link 转到 py2k 文档,但 py3k 是一样的,你的 print
语句暗示你的代码无论如何都是 py2k) :
is_alive()
Return whether the process is alive.
Roughly, a process object is alive from the moment the start()
method returns until the child process terminates.
因为我们看不到 dosomething
的代码,所以很难说这些东西是否会终止。他们可能会这样做(通过退出),但如果他们不这样做,或者不够快,我们可能会在这里遇到问题,我们只是在外循环中丢弃我们从队列中取出的消息。
如果它们确实终止,我们只是从数组中删除进程引用,通过覆盖它:
processArray[i] = Process(...)
processArray[i] 中的先前值被丢弃。不清楚您是否将其保存在其他任何地方,但如果没有,Process 实例将被丢弃,现在实际上 不可能 调用其 join()
方法。
一些 Python 数据结构倾向于在被放弃时自行清理(例如,打开流刷新输出并根据需要关闭),但多进程代码似乎不会自动加入()其子级。所以这可能是问题的根源。
最后,每当我们确实遇到外循环中的 else
情况时,我们都会对任何活动进程进行同样有点奇怪的搜索——顺便说一句,可以更清楚地写成:
if any(p.is_alive() for p in processArray):
只要我们不关心哪些 特定的 还活着,哪些不活着——如果 none 报告自己还活着,我们就会重置计数,但永远不要对变量 processArray
做任何事情,这样每个 processArray[i]
仍然持有 Process 实例的标识。 (所以至少我们可以对其中的每一个调用 join
,不包括任何因覆盖而丢失的内容。)
与其自己构建 Pool
,不如使用 multiprocess.Pool
及其 apply
和 apply_async
方法,如 .
我运行从单个 python 代码中宁多个进程:
代码片段:
while 1:
if sqsObject.msgCount() > 0:
ReadyMsg = sqsObject.readM2Q()
if ReadyMsg == 0:
continue
fileName = ReadyMsg['fileName']
dirName = ReadyMsg['dirName']
uuid = ReadyMsg['uid']
guid = ReadyMsg['guid']
callback = ReadyMsg['callbackurl']
# print ("Trigger Algorithm Process")
if(countProcess < maxProcess):
try:
retValue = Process(target=dosomething, args=(dirName, uuid,guid,callback))
processArray.append(retValue)
retValue.start()
countProcess = countProcess + 1
except:
print "Cannot Run Process"
else:
for i in range(len(processArray)):
if (processArray[i].is_alive() == True):
continue
else:
try:
#print 'Restart Process'
processArray[i] = Process(target=dosomething, args=(dirName,uuid,guid,callback))
processArray[i].start()
except:
print "Cannot Run Process"
else: # No more request to service
for i in range(len(processArray)):
if (processArray[i].is_alive() == True):
processRunning = 1
break
else:
continue
if processRunning == 0:
countProcess = 0
else:
processRunning = 0
在这里,我正在从队列中读取消息并创建一个进程以 运行 该消息的算法。我设置了 maxProcess 的上限。因此,在达到 maxProcess 后,我想通过检查 is_alive().
来重用不存在的 processArray 插槽此进程 运行 适合较少数量的进程,但是,对于大量消息(例如 100 条),内存消耗会达到顶峰。我在想我通过重用进程槽发生泄漏。
不确定这个过程中有什么问题。
提前感谢您发现错误或提出明智的建议。
Not sure what is wrong in the process.
看来您正在创建与消息一样多的进程,即使已达到 maxProcess 计数。
I am thinking I have leak by reusing the process slots.
无需自己管理流程。只需使用 process pool:
# before your while loop starts
from multiprocessing import Pool
pool = Pool(processes=max_process)
while 1:
...
# instead of creating a new Process
res = pool.apply_async(dosomething,
args=(dirName,uuid,guid,callback))
# after the while loop has finished
# -- wait to finish
pool.close()
pool.join()
提交工作的方式
请注意,Pool
class 支持多种提交作业的方式:
- apply_async - 一次一条消息
- map_async - 一次一大块消息
如果消息到达速度足够快,最好收集其中的几个(比如一次收集 10 条或 100 条,具体取决于完成的实际处理)并使用 map
提交 "mini-batch"一次到目标函数:
...
while True:
messages = []
# build mini-batch of messages
while len(messages) < batch_size:
... # get message
messages.append((dirName,uuid,guid,callback))
pool.map_async(dosomething, messages)
为了避免 dosomething
留下的内存泄漏,您可以要求池在消耗了一定数量的消息后重新启动进程:
max_tasks = 5 # some sensible number
Pool(max_processes, maxtasksperchild=max_tasks)
分发
如果使用这种方法仍然超出内存容量,请考虑使用分布式方法,即添加更多机器。使用 Celery 这将是非常直接的,来自上面:
# tasks.py
@task
def dosomething(...):
... # same code as before
# driver.py
while True:
... # get messages as before
res = somefunc.apply_async(args=(dirName,uuid,guid,callback))
总之,您的代码很奇怪:-)
它不是an mvce,所以没有人可以测试它,但是看看它,你在内部循环中有这个(稍微简化的)结构:
if count < limit:
... start a new process, and increment count ...
else:
do things that can potentially start even more processes
(but never, ever, decrease count)
这似乎充其量是不明智的。
在任何地方都没有流程实例的 join()
调用。 (我们稍后会回到外循环及其 else
的情况。)
让我们更仔细地看一下内部循环的 else
案例代码:
for i in range(len(processArray)):
if (processArray[i].is_alive() == True):
抛开不必要的 == True
测试——这有点冒险,因为 is_alive()
方法没有明确承诺 return True
和 False
,只是布尔值起作用的东西——考虑 this description from the documentation(这个 link 转到 py2k 文档,但 py3k 是一样的,你的 print
语句暗示你的代码无论如何都是 py2k) :
is_alive()
Return whether the process is alive.
Roughly, a process object is alive from the moment the
start()
method returns until the child process terminates.
因为我们看不到 dosomething
的代码,所以很难说这些东西是否会终止。他们可能会这样做(通过退出),但如果他们不这样做,或者不够快,我们可能会在这里遇到问题,我们只是在外循环中丢弃我们从队列中取出的消息。
如果它们确实终止,我们只是从数组中删除进程引用,通过覆盖它:
processArray[i] = Process(...)
processArray[i] 中的先前值被丢弃。不清楚您是否将其保存在其他任何地方,但如果没有,Process 实例将被丢弃,现在实际上 不可能 调用其 join()
方法。
一些 Python 数据结构倾向于在被放弃时自行清理(例如,打开流刷新输出并根据需要关闭),但多进程代码似乎不会自动加入()其子级。所以这可能是问题的根源。
最后,每当我们确实遇到外循环中的 else
情况时,我们都会对任何活动进程进行同样有点奇怪的搜索——顺便说一句,可以更清楚地写成:
if any(p.is_alive() for p in processArray):
只要我们不关心哪些 特定的 还活着,哪些不活着——如果 none 报告自己还活着,我们就会重置计数,但永远不要对变量 processArray
做任何事情,这样每个 processArray[i]
仍然持有 Process 实例的标识。 (所以至少我们可以对其中的每一个调用 join
,不包括任何因覆盖而丢失的内容。)
与其自己构建 Pool
,不如使用 multiprocess.Pool
及其 apply
和 apply_async
方法,如