为什么 `multiprocessing.Queue.get` 这么慢?
Why is `multiprocessing.Queue.get` so slow?
我需要帮助来理解 multiprocessing.Queue
。我面临的问题是,与调用 queue.put(...)
和队列的缓冲区(双端队列)相比,从 queue.get(...)
获取结果的速度非常落后。
这种泄漏的抽象让我调查了队列的内部结构。它很简单 source code just points me to the deque implementation,而且看起来也很简单,我无法用它来解释我所看到的行为。我还读到 Queue 使用管道,但我似乎无法在源代码中找到它。
我已将其归结为重现问题的最小示例,并在其下方指定了可能的输出。
import threading
import multiprocessing
import queue
q = None
def enqueue(item):
global q
if q is None:
q = multiprocessing.Queue()
process = threading.Thread(target=worker, args=(q,)) # or multiprocessing.Process Doesn't matter
process.start()
q.put(item)
print(f'len putted item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')
def worker(local_queue):
while True:
try:
while True: # get all items
item = local_queue.get(block=False)
print(f'len got item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')
except queue.Empty:
print('empty')
if __name__ == '__main__':
for i in range(1, 100000, 1000):
enqueue(list(range(i)))
输出:
empty
empty
empty
len putted item: 1. qsize: 1. buffer len: 1
len putted item: 1001. qsize: 2. buffer len: 2
len putted item: 2001. qsize: 3. buffer len: 1
len putted item: 3001. qsize: 4. buffer len: 2
len putted item: 4001. qsize: 5. buffer len: 3
len putted item: 5001. qsize: 6. buffer len: 4
len putted item: 6001. qsize: 7. buffer len: 5
len putted item: 7001. qsize: 8. buffer len: 6
len putted item: 8001. qsize: 9. buffer len: 7
len putted item: 9001. qsize: 10. buffer len: 8
len putted item: 10001. qsize: 11. buffer len: 9
len putted item: 11001. qsize: 12. buffer len: 10
len putted item: 12001. qsize: 13. buffer len: 11
len putted item: 13001. qsize: 14. buffer len: 12
len putted item: 14001. qsize: 15. buffer len: 13
len putted item: 15001. qsize: 16. buffer len: 14
len got item: 1. qsize: 15. buffer len: 14
len putted item: 16001. qsize: 16. buffer len: 15
len putted item: 17001. qsize: 17. buffer len: 16
len putted item: 18001. qsize: 18. buffer len: 17
len putted item: 19001. qsize: 19. buffer len: 18
len putted item: 20001. qsize: 20. buffer len: 19
len putted item: 21001. qsize: 21. buffer len: 20
len putted item: 22001. qsize: 22. buffer len: 21
len putted item: 23001. qsize: 23. buffer len: 22
len putted item: 24001. qsize: 24. buffer len: 23
len putted item: 25001. qsize: 25. buffer len: 24
len putted item: 26001. qsize: 26. buffer len: 25
len putted item: 27001. qsize: 27. buffer len: 26
len putted item: 28001. qsize: 28. buffer len: 27
len got item: 1001. qsize: 27. buffer len: 27
empty
len putted item: 29001. qsize: 28. buffer len: 28
empty
empty
empty
len got item: 2001. qsize: 27. buffer len: 27
empty
len putted item: 30001. qsize: 28. buffer len: 28
关于这个结果,请大家注意:插入第28001个元素后,worker发现队列中没有元素了,反之,还有几十个。由于同步,我可以只获取其中的一部分。但它只能找到 两个!
这种模式还在继续。
这似乎与我放入队列的对象的大小有关。对于小对象,说 i
而不是 list(range(i))
,就不会出现这个问题。但是所讨论的对象的大小仍然是千字节,不足以证明如此严重的延迟(在我的真实世界的非最小示例中,这很容易花费几分钟)
我的问题具体是:如何在Python中的进程之间共享(不是这样)大量数据?
此外,我想知道在 Queue 的内部实现中,这种迟缓来自哪里
我也遇到了这个问题。我正在发送大型 numpy 数组(~300MB),它在 mp.queue.get().
时太慢了
在查看了 mp.Queue 的 python2.7 源代码后,我发现最慢的部分(在类 unix 系统上)是 _conn_recvall()
中的 socket_connection.c , 但我没有深入研究。
为了解决这个问题,我构建了一个实验包 FMQ。
This project is inspired by the use of multiprocessing.Queue (mp.Queue). mp.Queue is slow for large data item because of the speed limitation of pipe (on Unix-like systems).
With mp.Queue handling the inter-process transfer, FMQ implements a stealer thread, which steals an item from mp.Queue once any item is available, and puts it into a Queue.Queue. Then, the consumer process can fetch the data from the Queue.Queue immediately.
The speed-up is based on the assumption that both producer and consumer processes are compute-intensive (thus multiprocessing is neccessary) and the data is large (eg. >50 227x227 images). Otherwise mp.Queue with multiprocessing or Queue.Queue with threading is good enough.
fmq.Queue 像 mp.Queue.
一样容易使用
请注意还有一些 Known Issues,因为这个项目还处于早期阶段。
对于未来的读者,您还可以尝试使用:
q = multiprocessing.Manager().Queue()
而不只是
q = multiprocessing.Queue()
我还没有完全提炼和理解这种行为背后的机制,但我读过的一篇 source 声称它是关于:
"when pushing large items onto the queue, the items are essentially
buffered, despite the immediate return of the queue’s put function."
作者继续解释更多关于它的信息和修复方法,但对我来说,添加管理器使技巧变得简单而干净。
更新:我相信有助于解释这个问题。
已接受的答案中提到的 FMQ 也是 Python2 独家的,这也是我觉得这个答案有朝一日可能会帮助更多人的原因之一。
我需要帮助来理解 multiprocessing.Queue
。我面临的问题是,与调用 queue.put(...)
和队列的缓冲区(双端队列)相比,从 queue.get(...)
获取结果的速度非常落后。
这种泄漏的抽象让我调查了队列的内部结构。它很简单 source code just points me to the deque implementation,而且看起来也很简单,我无法用它来解释我所看到的行为。我还读到 Queue 使用管道,但我似乎无法在源代码中找到它。
我已将其归结为重现问题的最小示例,并在其下方指定了可能的输出。
import threading
import multiprocessing
import queue
q = None
def enqueue(item):
global q
if q is None:
q = multiprocessing.Queue()
process = threading.Thread(target=worker, args=(q,)) # or multiprocessing.Process Doesn't matter
process.start()
q.put(item)
print(f'len putted item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')
def worker(local_queue):
while True:
try:
while True: # get all items
item = local_queue.get(block=False)
print(f'len got item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')
except queue.Empty:
print('empty')
if __name__ == '__main__':
for i in range(1, 100000, 1000):
enqueue(list(range(i)))
输出:
empty
empty
empty
len putted item: 1. qsize: 1. buffer len: 1
len putted item: 1001. qsize: 2. buffer len: 2
len putted item: 2001. qsize: 3. buffer len: 1
len putted item: 3001. qsize: 4. buffer len: 2
len putted item: 4001. qsize: 5. buffer len: 3
len putted item: 5001. qsize: 6. buffer len: 4
len putted item: 6001. qsize: 7. buffer len: 5
len putted item: 7001. qsize: 8. buffer len: 6
len putted item: 8001. qsize: 9. buffer len: 7
len putted item: 9001. qsize: 10. buffer len: 8
len putted item: 10001. qsize: 11. buffer len: 9
len putted item: 11001. qsize: 12. buffer len: 10
len putted item: 12001. qsize: 13. buffer len: 11
len putted item: 13001. qsize: 14. buffer len: 12
len putted item: 14001. qsize: 15. buffer len: 13
len putted item: 15001. qsize: 16. buffer len: 14
len got item: 1. qsize: 15. buffer len: 14
len putted item: 16001. qsize: 16. buffer len: 15
len putted item: 17001. qsize: 17. buffer len: 16
len putted item: 18001. qsize: 18. buffer len: 17
len putted item: 19001. qsize: 19. buffer len: 18
len putted item: 20001. qsize: 20. buffer len: 19
len putted item: 21001. qsize: 21. buffer len: 20
len putted item: 22001. qsize: 22. buffer len: 21
len putted item: 23001. qsize: 23. buffer len: 22
len putted item: 24001. qsize: 24. buffer len: 23
len putted item: 25001. qsize: 25. buffer len: 24
len putted item: 26001. qsize: 26. buffer len: 25
len putted item: 27001. qsize: 27. buffer len: 26
len putted item: 28001. qsize: 28. buffer len: 27
len got item: 1001. qsize: 27. buffer len: 27
empty
len putted item: 29001. qsize: 28. buffer len: 28
empty
empty
empty
len got item: 2001. qsize: 27. buffer len: 27
empty
len putted item: 30001. qsize: 28. buffer len: 28
关于这个结果,请大家注意:插入第28001个元素后,worker发现队列中没有元素了,反之,还有几十个。由于同步,我可以只获取其中的一部分。但它只能找到 两个!
这种模式还在继续。
这似乎与我放入队列的对象的大小有关。对于小对象,说 i
而不是 list(range(i))
,就不会出现这个问题。但是所讨论的对象的大小仍然是千字节,不足以证明如此严重的延迟(在我的真实世界的非最小示例中,这很容易花费几分钟)
我的问题具体是:如何在Python中的进程之间共享(不是这样)大量数据? 此外,我想知道在 Queue 的内部实现中,这种迟缓来自哪里
我也遇到了这个问题。我正在发送大型 numpy 数组(~300MB),它在 mp.queue.get().
时太慢了在查看了 mp.Queue 的 python2.7 源代码后,我发现最慢的部分(在类 unix 系统上)是 _conn_recvall()
中的 socket_connection.c , 但我没有深入研究。
为了解决这个问题,我构建了一个实验包 FMQ。
This project is inspired by the use of multiprocessing.Queue (mp.Queue). mp.Queue is slow for large data item because of the speed limitation of pipe (on Unix-like systems).
With mp.Queue handling the inter-process transfer, FMQ implements a stealer thread, which steals an item from mp.Queue once any item is available, and puts it into a Queue.Queue. Then, the consumer process can fetch the data from the Queue.Queue immediately.
The speed-up is based on the assumption that both producer and consumer processes are compute-intensive (thus multiprocessing is neccessary) and the data is large (eg. >50 227x227 images). Otherwise mp.Queue with multiprocessing or Queue.Queue with threading is good enough.
fmq.Queue 像 mp.Queue.
一样容易使用请注意还有一些 Known Issues,因为这个项目还处于早期阶段。
对于未来的读者,您还可以尝试使用:
q = multiprocessing.Manager().Queue()
而不只是
q = multiprocessing.Queue()
我还没有完全提炼和理解这种行为背后的机制,但我读过的一篇 source 声称它是关于:
"when pushing large items onto the queue, the items are essentially buffered, despite the immediate return of the queue’s put function."
作者继续解释更多关于它的信息和修复方法,但对我来说,添加管理器使技巧变得简单而干净。
更新:我相信
已接受的答案中提到的 FMQ 也是 Python2 独家的,这也是我觉得这个答案有朝一日可能会帮助更多人的原因之一。