Python 使用进程的多处理:消耗大量内存

Python Multiprocessing using Process: Consuming Large Memory

我运行从单个 python 代码中宁多个进程:

代码片段:

while 1:
   if sqsObject.msgCount() > 0:
        ReadyMsg = sqsObject.readM2Q()
        if ReadyMsg == 0:
            continue
        fileName = ReadyMsg['fileName']
        dirName  = ReadyMsg['dirName']
        uuid         = ReadyMsg['uid']
        guid         = ReadyMsg['guid']
        callback     = ReadyMsg['callbackurl']

        # print ("Trigger Algorithm Process")
        if(countProcess < maxProcess):

           try:
             retValue = Process(target=dosomething, args=(dirName, uuid,guid,callback))
             processArray.append(retValue)
             retValue.start()
             countProcess = countProcess + 1
           except:
             print "Cannot Run Process"
        else:
           for i in range(len(processArray)):
              if (processArray[i].is_alive() == True):
                 continue
              else:
                 try:
                    #print 'Restart Process'
                    processArray[i] = Process(target=dosomething, args=(dirName,uuid,guid,callback))
                    processArray[i].start()
                 except:
                    print "Cannot Run Process"


   else: # No more request to service

       for i in range(len(processArray)):
            if (processArray[i].is_alive() == True):
                processRunning = 1
                break
            else:
                continue

      if processRunning == 0:
           countProcess = 0

      else:
           processRunning = 0

在这里,我正在从队列中读取消息并创建一个进程以 运行 该消息的算法。我设置了 maxProcess 的上限。因此,在达到 maxProcess 后,我想通过检查 is_alive().

来重用不存在的 processArray 插槽

此进程 运行 适合较少数量的进程,但是,对于大量消息(例如 100 条),内存消耗会达到顶峰。我在想我通过重用进程槽发生泄漏。

不确定这个过程中有什么问题。

提前感谢您发现错误或提出明智的建议。

Not sure what is wrong in the process.

看来您正在创建与消息一样多的进程,即使已达到 maxProcess 计数。

I am thinking I have leak by reusing the process slots.

无需自己管理流程。只需使用 process pool:

 # before your while loop starts
 from multiprocessing import Pool
 pool = Pool(processes=max_process)
 while 1:
   ...
   # instead of creating a new Process
   res = pool.apply_async(dosomething, 
                          args=(dirName,uuid,guid,callback)) 
 # after the while loop has finished
 # -- wait to finish
 pool.close()
 pool.join()

提交工作的方式

请注意,Pool class 支持多种提交作业的方式:

  • apply_async - 一次一条消息
  • map_async - 一次一大块消息

如果消息到达速度足够快,最好收集其中的几个(比如一次收集 10 条或 100 条,具体取决于完成的实际处理)并使用 map 提交 "mini-batch"一次到目标函数:

...
while True:
    messages = []
    # build mini-batch of messages
    while len(messages) < batch_size:
        ... # get message
        messages.append((dirName,uuid,guid,callback))
    pool.map_async(dosomething, messages)

为了避免 dosomething 留下的内存泄漏,您可以要求池在消耗了一定数量的消息后重新启动进程:

max_tasks = 5 # some sensible number
Pool(max_processes, maxtasksperchild=max_tasks)

分发

如果使用这种方法仍然超出内存容量,请考虑使用分布式方法,即添加更多机器。使用 Celery 这将是非常直接的,来自上面:

# tasks.py
@task
def dosomething(...):
   ... # same code as before

# driver.py
  while True:
     ... # get messages as before
     res = somefunc.apply_async(args=(dirName,uuid,guid,callback))  

总之,您的代码很奇怪:-)

它不是an mvce,所以没有人可以测试它,但是看看它,你在内部循环中有这个(稍微简化的)结构:

if count < limit:
    ... start a new process, and increment count ...
else:
    do things that can potentially start even more processes
    (but never, ever, decrease count)

这似乎充其量是不明智的。

在任何地方都没有流程实例的 join() 调用。 (我们稍后会回到外循环及其 else 的情况。)

让我们更仔细地看一下内部循环的 else 案例代码:

   for i in range(len(processArray)):
        if (processArray[i].is_alive() == True):

抛开不必要的 == True 测试——这有点冒险,因为 is_alive() 方法没有明确承诺 return TrueFalse,只是布尔值起作用的东西——考虑 this description from the documentation(这个 link 转到 py2k 文档,但 py3k 是一样的,你的 print 语句暗示你的代码无论如何都是 py2k) :

is_alive()

Return whether the process is alive.

Roughly, a process object is alive from the moment the start() method returns until the child process terminates.

因为我们看不到 dosomething 的代码,所以很难说这些东西是否会终止。他们可能会这样做(通过退出),但如果他们不这样做,或者不够快,我们可能会在这里遇到问题,我们只是在外循环中丢弃我们从队列中取出的消息。

如果它们确实终止,我们只是从数组中删除进程引用,通过覆盖它:

            processArray[i] = Process(...)

processArray[i] 中的先前值被丢弃。不清楚您是否将其保存在其他任何地方,但如果没有,Process 实例将被丢弃,现在实际上 不可能 调用其 join() 方法。

一些 Python 数据结构倾向于在被放弃时自行清理(例如,打开流刷新输出并根据需要关闭),但多进程代码似乎不会自动加入()其子级。所以这可能是问题的根源。

最后,每当我们确实遇到外循环中的 else 情况时,我们都会对任何活动进程进行同样有点奇怪的搜索——顺便说一句,可以更清楚地写成:

if any(p.is_alive() for p in processArray):

只要我们不关心哪些 特定的 还活着,哪些不活着——如果 none 报告自己还活着,我们就会重置计数,但永远不要对变量 processArray 做任何事情,这样每个 processArray[i] 仍然持有 Process 实例的标识。 (所以至少我们可以对其中的每一个调用 join,不包括任何因覆盖而丢失的内容。)

与其自己构建 Pool,不如使用 multiprocess.Pool 及其 applyapply_async 方法,如 .