Queue.get() 内存泄漏

Queue.get() memory leak

我使用以下代码在使用 get() 方法清空队列后产生了看起来像内存泄漏的情况。

import queue
import os
import psutil

def run(del_after_puts, del_after_gets, n_puts, process):

        mem = queue.Queue()
    
        for msg in range(n_puts):
            msg_put = f'{msg}_0000000000000000000000000000000000000000000000000000000000000333333333333331111111111'
            if msg % 1000000 == 0:
                print(f'puting  {msg} qsize {len(mem.queue)}')
            mem.put(msg_put)
    
    
        print(f'------ put done  ----- qsize {len(mem.queue)}')
        print(f'mem_pct {round(process.memory_percent(), 2)}% ')
    
        if del_after_puts:
            print(f'deleting queue after puts {mem}')
            del mem
            print(f'mem_pct {round(process.memory_percent(), 2)}% ')
            return
    
        for _ in range(n_puts):
            msg_get = mem.get()
            msg = int(msg_get.split('_')[0])
            if msg % 1000000 == 0:
                print(f'getting_q {msg} qsize {len(mem.queue)} ')
            mem.task_done()
            
        print(f'------ gets done  ----- qsize {len(mem.queue)}')
        print(f'mem_pct {round(process.memory_percent(), 2)}% ')
    
        if del_after_gets:
            print(f'deleting queue after gets {mem}')
            del mem
            print(f'mem_pct {round(process.memory_percent(), 2)}% ')
            return
    
    if __name__ == '__main__':
        del_after_puts = False
        del_after_gets = False
        n_puts = 20_000_000
        print()
        print('#########')
        print(f'del_after_puts {del_after_puts} del_after_gets {del_after_gets} n_puts {n_puts}')
    
        process = psutil.Process(os.getpid())
        print('before run')
        print(f'mem_pct {round(process.memory_percent(), 2)}% ')
    
        run(del_after_puts, del_after_gets, n_puts, process)
    
        print(f'after run return')
        print(f'mem_pct {round(process.memory_percent(), 2)}% ')

此脚本可以 运行 3 种方式:

  1. 将 n_puts 个元素添加到队列中,然后将其清空。
  2. 向队列中添加n_puts个元素,然后删除队列对象
  3. 向队列中添加 n_puts 个元素,然后将其清空,然后删除队列对象。

对于第一种和第三种情况,脚本似乎会产生内存泄漏,如下所示:

第一种情况,在将元素放入队列之前使用的内存为 0.15%,清空后为 2.22%:

#########

del_after_puts False del_after_gets False n_puts 20000000

before run

mem_pct 0.15%

------ put done ----- qsize 20000000

mem_pct 37.61%

------ gets done ----- qsize 0

mem_pct 2.22%

第三种情况,放入元素入队列前mem使用0.15%,清空后2.22%,删除对象后2.22%:

#########

del_after_puts False del_after_gets True n_puts 20000000

before run

mem_pct 0.15%

------ put done ----- qsize 20000000

mem_pct 37.61%

------ gets done ----- qsize 0

mem_pct 2.22%

deleting queue after gets <queue.Queue object at 0x7fbd87295a10>

mem_pct 2.22%

第2种情况,开始时mem_pct是0.15%,把所有元素都放入队列,直接删除后,是0.16%,差不多。

#########

del_after_puts True del_after_gets False n_puts 20000000

before run

mem_pct 0.15%

------ put done ----- qsize 20000000

mem_pct 37.61%

deleting queue after puts <queue.Queue object at 0x7f29084eca10>

mem_pct 0.16%

可以看出,内存returns只在第二种情况下才达到启动级别,当时只调用了queue.put(),因此似乎queue.get()产生了内存泄漏。

这在 python 3.7、3.8 和 3.9 中持续存在。

我已经尝试使用 tracemalloc 和 pympler 分析内存,但它们没有显示 python 级别的任何泄漏,所以我怀疑这可能是 C 级泄漏。

我在应用程序上使用队列和线程进行日志记录,这些线程被支持 运行 好几个星期了,队列似乎会导致泄漏并挂起我的应用程序。我能够追踪那里的泄漏,它们表明这似乎来自队列机制中使用的条件锁中的双端队列,但我从未在我的 [=56= 中看到任何 elements/waiters ]宁申请,所以去图。

threading.py:348:
waiters_to_notify = _deque(_islice(all_waiters, n))

无论如何,有没有办法缓解和处理这种队列泄漏? 谢谢

如果有人在寻找解决方案,泄漏的原因似乎是队列实现中使用的库存 python 双端队列。我已经从此处找到的库存中更改了 deque 实现:

https://github.com/kata198/python-cllist

然后重新定义队列class如下:

from cllist import dllist

class DllistQueue(queue.Queue):
    def _init(self, maxsize):
        self.queue = dllist()

那么新的class测试结果是:

#########

del_after_puts False del_after_gets True n_puts 20000000

before run

mem_pct 0.15%

puting 0 qsize 0

------ put done ----- qsize 20000000

mem_pct 47.35%

------ gets done ----- qsize 0

mem_pct 0.16%

deleting queue after gets <main.DllistQueue object at 0x7f3a197d8f90>

mem_pct 0.16%

所以这个链接列表的实现似乎没有泄漏,而且它和普通的一样快。唯一的缺点是它在 top usage 时会占用更多的内存。