Python 中的队列与 JoinableQueue

Queue vs JoinableQueue in Python

在Python中使用多处理模块时有两种队列:

它们有什么区别?

队列

from multiprocessing import Queue
q = Queue()
q.put(item) # Put an item on the queue
item = q.get() # Get an item from the queue

加入队列

from multiprocessing import JoinableQueue
q = JoinableQueue()
q.task_done() # Signal task completion
q.join() # Wait for completion

JoinableQueue has methods join() and task_done(), which Queue 还没有。


class multiprocessing.Queue( [maxsize] )

Returns a process shared queue implemented using a pipe and a few locks/semaphores. When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.

The usual Queue.Empty and Queue.Full exceptions from the standard library’s Queue module are raised to signal timeouts.

Queue implements all the methods of Queue.Queue except for task_done() and join().


class multiprocessing.JoinableQueue( [maxsize] )

JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods.

task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.


如果您使用 JoinableQueue,那么您必须为从队列中移除的每个任务调用 JoinableQueue.task_done(),否则用于计算未完成任务数量的信号量可能最终溢出,引发异常。

根据文档,很难确定 Queue 实际上是空的。使用 JoinableQueue,您可以通过调用 q.join() 等待队列清空。如果您想分批完成工作,在每批结束时做一些离散的事情,这可能会有所帮助。

例如,您可能一次通过队列处理 1000 个项目,然后向用户发送推送通知,告知您已完成另一批处理。这将很难用正常的 Queue.

来实现

它可能看起来像:

import multiprocessing as mp

BATCH_SIZE = 1000
STOP_VALUE = 'STOP'

def consume(q):
  for item in iter(q.get, STOP_VALUE):
    try:
      process(item)
    # Be very defensive about errors since they can corrupt pipes.
    except Exception as e:
      logger.error(e)
    finally:
      q.task_done()

q = mp.JoinableQueue()
with mp.Pool() as pool:
  # Pull items off queue as fast as we can whenever they're ready.
  for _ in range(mp.cpu_count()):
    pool.apply_async(consume, q)
  for i in range(0, len(URLS), BATCH_SIZE):
    # Put `BATCH_SIZE` items in queue asynchronously.
    pool.map_async(expensive_func, URLS[i:i+BATCH_SIZE], callback=q.put)
    # Wait for the queue to empty.
    q.join()
    notify_users()
  # Stop the consumers so we can exit cleanly.
  for _ in range(mp.cpu_count()):
    q.put(STOP_VALUE)

注意:我实际上 运行 没有这段代码。如果你从队列中取出物品的速度比放入队列的速度快,你可能会提前完成。在那种情况下,此代码至少每 1000 个项目发送一次更新,并且可能更频繁。对于进度更新,这可能没问题。如果精确到 1000 很重要,您可以使用 mp.Value('i', 0) 并在 join 发布时检查它是否为 1000。