multiprocessing.Queue 中的大对象死锁

Deadlock with big object in multiprocessing.Queue

当您向 multiprocessing.Queue 提供足够大的对象时,程序似乎会在奇怪的地方挂起。考虑这个最小的例子:

import multiprocessing

def dump_dict(queue, size):
  queue.put({x: x for x in range(size)})
  print("Dump finished")

if __name__ == '__main__':
  SIZE = int(1e5)
  queue = multiprocessing.Queue()
  process = multiprocessing.Process(target=dump_dict, args=(queue, SIZE))
  print("Starting...")
  process.start()
  print("Joining...")
  process.join()
  print("Done")
  print(len(queue.get()))

如果 SIZE 参数足够小(<= 1e4 至少在我的例子中),整个程序运行顺利没有问题,但是一旦 SIZE 足够大,该程序挂在奇怪的地方。现在,在搜索解释时,即 python multiprocessing - process hangs on join for large queue,我总是看到 "you need to consume from the queue" 的一般答案。但是看起来奇怪的是程序实际上打印了 Dump finished 即在将对象放入 queue 之后到达代码行。此外,使用 Queue.put_nowait 而不是 Queue.put 并没有什么不同。

最后,如果您使用 Process.join(1) 而不是 Process.join(),整个过程将完成 ,队列中有完整的字典 (即 print(len(..)) 行将打印 10000).

有人可以让我对此有更多了解吗?

您需要在 process.join() 之前的 parent 中 queue.get() 以防止出现死锁。队列已经生成了一个 feeder-thread,它的第一个 queue.put() 和 worker-process 中的 MainThread 在退出之前加入了这个 feeder-thread。所以 worker-process 在结果完全刷新到 (OS-pipe-) 缓冲区之前不会退出,但是你的结果太大而无法放入缓冲区并且你的 parent 没有读取从队列中直到工人退出,导致死锁。

您看到 print("Dump finished") 的输出,因为实际发送发生在 feeder-thread,queue.put() 本身只是附加到 worker-process 中的 collections.deque作为中间步骤。

面对类似的问题,我使用@Darkonaut 的回答和以下实现解决了它:

import time
done = 0
while done < n:    # n is the number of objects you expect to get
    if not queue.empty():
        done += 1
        results = queue.get()
        # Do something with the results
    else:
        time.sleep(.5)

感觉不是很 pythonic,但它起作用了!