为什么 `multiprocessing.Queue.get` 这么慢?

Why is `multiprocessing.Queue.get` so slow?

我需要帮助来理解 multiprocessing.Queue。我面临的问题是,与调用 queue.put(...) 和队列的缓冲区(双端队列)相比,从 queue.get(...) 获取结果的速度非常落后。

这种泄漏的抽象让我调查了队列的内部结构。它很简单 source code just points me to the deque implementation,而且看起来也很简单,我无法用它来解释我所看到的行为。我还读到 Queue 使用管道,但我似乎无法在源代码中找到它。

我已将其归结为重现问题的最小示例,并在其下方指定了可能的输出。

import threading
import multiprocessing
import queue

q = None
def enqueue(item):
    global q
    if q is None:
        q = multiprocessing.Queue()
        process = threading.Thread(target=worker, args=(q,))  # or multiprocessing.Process Doesn't matter
        process.start()
    q.put(item)
    print(f'len putted item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')


def worker(local_queue):
    while True:
        try:
            while True:  # get all items
                item = local_queue.get(block=False)
                print(f'len got item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')
        except queue.Empty:
            print('empty')


if __name__ == '__main__':
    for i in range(1, 100000, 1000):
        enqueue(list(range(i)))

输出:

empty
empty
empty
len putted item: 1. qsize: 1. buffer len: 1
len putted item: 1001. qsize: 2. buffer len: 2
len putted item: 2001. qsize: 3. buffer len: 1
len putted item: 3001. qsize: 4. buffer len: 2
len putted item: 4001. qsize: 5. buffer len: 3
len putted item: 5001. qsize: 6. buffer len: 4
len putted item: 6001. qsize: 7. buffer len: 5
len putted item: 7001. qsize: 8. buffer len: 6
len putted item: 8001. qsize: 9. buffer len: 7
len putted item: 9001. qsize: 10. buffer len: 8
len putted item: 10001. qsize: 11. buffer len: 9
len putted item: 11001. qsize: 12. buffer len: 10
len putted item: 12001. qsize: 13. buffer len: 11
len putted item: 13001. qsize: 14. buffer len: 12
len putted item: 14001. qsize: 15. buffer len: 13
len putted item: 15001. qsize: 16. buffer len: 14
len got item: 1. qsize: 15. buffer len: 14
len putted item: 16001. qsize: 16. buffer len: 15
len putted item: 17001. qsize: 17. buffer len: 16
len putted item: 18001. qsize: 18. buffer len: 17
len putted item: 19001. qsize: 19. buffer len: 18
len putted item: 20001. qsize: 20. buffer len: 19
len putted item: 21001. qsize: 21. buffer len: 20
len putted item: 22001. qsize: 22. buffer len: 21
len putted item: 23001. qsize: 23. buffer len: 22
len putted item: 24001. qsize: 24. buffer len: 23
len putted item: 25001. qsize: 25. buffer len: 24
len putted item: 26001. qsize: 26. buffer len: 25
len putted item: 27001. qsize: 27. buffer len: 26
len putted item: 28001. qsize: 28. buffer len: 27
len got item: 1001. qsize: 27. buffer len: 27
empty
len putted item: 29001. qsize: 28. buffer len: 28
empty
empty
empty
len got item: 2001. qsize: 27. buffer len: 27
empty
len putted item: 30001. qsize: 28. buffer len: 28

关于这个结果,请大家注意:插入第28001个元素后,worker发现队列中没有元素了,反之,还有几十个。由于同步,我可以只获取其中的一部分。但它只能找到 两个!

这种模式还在继续。

这似乎与我放入队列的对象的大小有关。对于小对象,说 i 而不是 list(range(i)),就不会出现这个问题。但是所讨论的对象的大小仍然是千字节,不足以证明如此严重的延迟(在我的真实世界的非最小示例中,这很容易花费几分钟)

我的问题具体是:如何在Python中的进程之间共享(不是这样)大量数据? 此外,我想知道在 Queue 的内部实现中,这种迟缓来自哪里

我也遇到了这个问题。我正在发送大型 numpy 数组(~300MB),它在 mp.queue.get().

时太慢了

在查看了 mp.Queue 的 python2.7 源代码后,我发现最慢的部分(在类 unix 系统上)是 _conn_recvall() 中的 socket_connection.c , 但我没有深入研究。

为了解决这个问题,我构建了一个实验包 FMQ

This project is inspired by the use of multiprocessing.Queue (mp.Queue). mp.Queue is slow for large data item because of the speed limitation of pipe (on Unix-like systems).

With mp.Queue handling the inter-process transfer, FMQ implements a stealer thread, which steals an item from mp.Queue once any item is available, and puts it into a Queue.Queue. Then, the consumer process can fetch the data from the Queue.Queue immediately.

The speed-up is based on the assumption that both producer and consumer processes are compute-intensive (thus multiprocessing is neccessary) and the data is large (eg. >50 227x227 images). Otherwise mp.Queue with multiprocessing or Queue.Queue with threading is good enough.

fmq.Queue 像 mp.Queue.

一样容易使用

请注意还有一些 Known Issues,因为这个项目还处于早期阶段。

对于未来的读者,您还可以尝试使用:

q = multiprocessing.Manager().Queue()

而不只是

q = multiprocessing.Queue()

我还没有完全提炼和理解这种行为背后的机制,但我读过的一篇 source 声称它是关于:

"when pushing large items onto the queue, the items are essentially buffered, despite the immediate return of the queue’s put function."

作者继续解释更多关于它的信息和修复方法,但对我来说,添加管理器使技巧变得简单而干净。

更新:我相信有助于解释这个问题。

已接受的答案中提到的 FMQ 也是 Python2 独家的,这也是我觉得这个答案有朝一日可能会帮助更多人的原因之一。