使用 multiprocessing.JoinableQueue 避免队列溢出导致的死锁

Avoiding deadlocks due to queue overflow with multiprocessing.JoinableQueue

假设我们有一个 multiprocessing.Pool,其中工作线程共享一个 multiprocessing.JoinableQueue,使工作项出队并可能使更多工作入队:

def worker_main(queue):
    while True:
        work = queue.get()
        for new_work in process(work):
            queue.put(new_work)
        queue.task_done()

当队列填满时,queue.put() 将阻塞。只要至少有一个进程从 queue.get() 的队列中读取,它就会释放队列中的 space 以解除对写入者的阻塞。但是所有进程都可能同时阻塞在 queue.put()

有没有办法避免像这样被卡住?

根据 process(work) 创建更多项目的频率,除了无限大的队列之外可能没有解决方案。

简而言之,您的队列必须足够大,以容纳您随时可能拥有的全部积压工作项。


queue is implemented with semaphores, there may indeed be a hard size limit of SEM_VALUE_MAX which in MacOS is 32767。因此,如果这还不够,您需要对该实现进行子类化或使用 put(block=False) 并处理 queue.Full(例如,将多余的项目放在其他地方)。

或者,查看 one of the 3rd-party implementations of distributed work item queue for Python