使用 multiprocessing.JoinableQueue 避免队列溢出导致的死锁
Avoiding deadlocks due to queue overflow with multiprocessing.JoinableQueue
假设我们有一个 multiprocessing.Pool
,其中工作线程共享一个 multiprocessing.JoinableQueue
,使工作项出队并可能使更多工作入队:
def worker_main(queue):
while True:
work = queue.get()
for new_work in process(work):
queue.put(new_work)
queue.task_done()
当队列填满时,queue.put()
将阻塞。只要至少有一个进程从 queue.get()
的队列中读取,它就会释放队列中的 space 以解除对写入者的阻塞。但是所有进程都可能同时阻塞在 queue.put()
。
有没有办法避免像这样被卡住?
根据 process(work)
创建更多项目的频率,除了无限大的队列之外可能没有解决方案。
简而言之,您的队列必须足够大,以容纳您随时可能拥有的全部积压工作项。
自 queue is implemented with semaphores, there may indeed be a hard size limit of SEM_VALUE_MAX
which in MacOS is 32767。因此,如果这还不够,您需要对该实现进行子类化或使用 put(block=False)
并处理 queue.Full
(例如,将多余的项目放在其他地方)。
或者,查看 one of the 3rd-party implementations of distributed work item queue for Python。
假设我们有一个 multiprocessing.Pool
,其中工作线程共享一个 multiprocessing.JoinableQueue
,使工作项出队并可能使更多工作入队:
def worker_main(queue):
while True:
work = queue.get()
for new_work in process(work):
queue.put(new_work)
queue.task_done()
当队列填满时,queue.put()
将阻塞。只要至少有一个进程从 queue.get()
的队列中读取,它就会释放队列中的 space 以解除对写入者的阻塞。但是所有进程都可能同时阻塞在 queue.put()
。
有没有办法避免像这样被卡住?
根据 process(work)
创建更多项目的频率,除了无限大的队列之外可能没有解决方案。
简而言之,您的队列必须足够大,以容纳您随时可能拥有的全部积压工作项。
自 queue is implemented with semaphores, there may indeed be a hard size limit of SEM_VALUE_MAX
which in MacOS is 32767。因此,如果这还不够,您需要对该实现进行子类化或使用 put(block=False)
并处理 queue.Full
(例如,将多余的项目放在其他地方)。
或者,查看 one of the 3rd-party implementations of distributed work item queue for Python。