如何使用multiprocessing.Queue.get方法?
How to use multiprocessing.Queue.get method?
下面的代码将三个号码放入一个队列中。然后它尝试从队列中取回数字。但它从来没有。如何从队列中获取数据?
import multiprocessing
queue = multiprocessing.Queue()
for i in range(3):
queue.put(i)
while not queue.empty():
print queue.get()
您的代码确实有效,有时。
那是因为队列不是立即不为空。该实现更多地涉及支持多个进程之间的通信,因此涉及的线程和管道导致 empty
状态持续的时间比您的代码允许的要长一些。
参见 Pipes and Queues section 中的注释:
When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences which are a little surprising, but should not cause any practical difficulties – if they really bother you then you can instead use a queue created with a manager.
- After putting an object on an empty queue there may be an infinitesimal delay before the queue’s
empty()
method returns False
[...]
(大胆强调我的)
如果你先添加一个循环来检查是否为空 那么你的代码就可以工作了:
queue = multiprocessing.Queue()
for i in range(3):
queue.put(i)
while queue.empty():
print 'queue is still empty'
while not queue.empty():
print queue.get()
当你运行上面的时候,大多数时候'queue is still empty'
出现一次。有时根本不出现,有时会打印两次。
我最初在阅读@Martijn Pieters 后删除了这个答案,因为他更早更详细地描述了 "why this doesn't work"。然后
我意识到,OP 示例中的用例不太符合
的规范冠冕堂皇的标题
"How to use multiprocessing.Queue.get method".
那不是因为有
没有涉及演示的 child 过程,但是因为在实际应用中几乎没有 queue 是 pre-filled 并且只在之后读出,但是阅读
写作与中间的等待时间交织在一起。 Martijn 展示的扩展演示代码在通常情况下不起作用,因为当排队跟不上读取时,while 循环会很快中断。所以这是重新加载的答案,它能够处理通常的交错提要和读取场景:
不要依赖 queue.empty 检查同步。
After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising queue.Empty.
...
empty()
Return True if the queue is empty, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable. docs
要么使用 for msg in iter(queue.get, sentinel):
到 .get()
从 queue,你通过传递标记值跳出循环...iter(callable, sentinel)?
from multiprocessing import Queue
SENTINEL = None
if __name__ == '__main__':
queue = Queue()
for i in [*range(3), SENTINEL]:
queue.put(i)
for msg in iter(queue.get, SENTINEL):
print(msg)
...或使用 get_nowait()
并处理可能的 queue.Empty
异常,如果您需要 non-blocking 解决方案。
from multiprocessing import Queue
from queue import Empty
import time
SENTINEL = None
if __name__ == '__main__':
queue = Queue()
for i in [*range(3), SENTINEL]:
queue.put(i)
while True:
try:
msg = queue.get_nowait()
if msg == SENTINEL:
break
print(msg)
except Empty:
# do other stuff
time.sleep(0.1)
如果只有一个进程并且该进程中只有一个线程正在读取 queue,也可以将最后一个代码片段交换为:
while True:
if not queue.empty(): # this is not an atomic operation ...
msg = queue.get() # ... thread could be interrupted in between
if msg == SENTINEL:
break
print(msg)
else:
# do other stuff
time.sleep(0.1)
由于线程可能会在检查 if not queue.empty()
和 queue.get()
之间删除 GIL,因此这不适合 multi-threaded queue-reads过程。如果多个进程正在从 queue 读取,这同样适用。
对于 single-producer / single-consumer 场景,使用 multiprocessing.Pipe
而不是 multiprocessing.Queue
就足够了,但是性能更高。
使用前检查queue
get
:
import multiprocessing
queue = multiprocessing.Queue()
for i in range(3):
queue.put(i)
while not queue.empty():
if not queue.empty():
print queue.get()
下面的代码将三个号码放入一个队列中。然后它尝试从队列中取回数字。但它从来没有。如何从队列中获取数据?
import multiprocessing
queue = multiprocessing.Queue()
for i in range(3):
queue.put(i)
while not queue.empty():
print queue.get()
您的代码确实有效,有时。
那是因为队列不是立即不为空。该实现更多地涉及支持多个进程之间的通信,因此涉及的线程和管道导致 empty
状态持续的时间比您的代码允许的要长一些。
参见 Pipes and Queues section 中的注释:
When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences which are a little surprising, but should not cause any practical difficulties – if they really bother you then you can instead use a queue created with a manager.
- After putting an object on an empty queue there may be an infinitesimal delay before the queue’s
empty()
method returnsFalse
[...]
(大胆强调我的)
如果你先添加一个循环来检查是否为空 那么你的代码就可以工作了:
queue = multiprocessing.Queue()
for i in range(3):
queue.put(i)
while queue.empty():
print 'queue is still empty'
while not queue.empty():
print queue.get()
当你运行上面的时候,大多数时候'queue is still empty'
出现一次。有时根本不出现,有时会打印两次。
我最初在阅读@Martijn Pieters 后删除了这个答案,因为他更早更详细地描述了 "why this doesn't work"。然后 我意识到,OP 示例中的用例不太符合
的规范冠冕堂皇的标题"How to use multiprocessing.Queue.get method".
那不是因为有 没有涉及演示的 child 过程,但是因为在实际应用中几乎没有 queue 是 pre-filled 并且只在之后读出,但是阅读 写作与中间的等待时间交织在一起。 Martijn 展示的扩展演示代码在通常情况下不起作用,因为当排队跟不上读取时,while 循环会很快中断。所以这是重新加载的答案,它能够处理通常的交错提要和读取场景:
不要依赖 queue.empty 检查同步。
After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising queue.Empty. ...
empty()
Return True if the queue is empty, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable. docs
要么使用 for msg in iter(queue.get, sentinel):
到 .get()
从 queue,你通过传递标记值跳出循环...iter(callable, sentinel)?
from multiprocessing import Queue
SENTINEL = None
if __name__ == '__main__':
queue = Queue()
for i in [*range(3), SENTINEL]:
queue.put(i)
for msg in iter(queue.get, SENTINEL):
print(msg)
...或使用 get_nowait()
并处理可能的 queue.Empty
异常,如果您需要 non-blocking 解决方案。
from multiprocessing import Queue
from queue import Empty
import time
SENTINEL = None
if __name__ == '__main__':
queue = Queue()
for i in [*range(3), SENTINEL]:
queue.put(i)
while True:
try:
msg = queue.get_nowait()
if msg == SENTINEL:
break
print(msg)
except Empty:
# do other stuff
time.sleep(0.1)
如果只有一个进程并且该进程中只有一个线程正在读取 queue,也可以将最后一个代码片段交换为:
while True:
if not queue.empty(): # this is not an atomic operation ...
msg = queue.get() # ... thread could be interrupted in between
if msg == SENTINEL:
break
print(msg)
else:
# do other stuff
time.sleep(0.1)
由于线程可能会在检查 if not queue.empty()
和 queue.get()
之间删除 GIL,因此这不适合 multi-threaded queue-reads过程。如果多个进程正在从 queue 读取,这同样适用。
对于 single-producer / single-consumer 场景,使用 multiprocessing.Pipe
而不是 multiprocessing.Queue
就足够了,但是性能更高。
使用前检查queue
get
:
import multiprocessing
queue = multiprocessing.Queue()
for i in range(3):
queue.put(i)
while not queue.empty():
if not queue.empty():
print queue.get()