multiprocessing.Queue 中的大对象死锁
Deadlock with big object in multiprocessing.Queue
当您向 multiprocessing.Queue
提供足够大的对象时,程序似乎会在奇怪的地方挂起。考虑这个最小的例子:
import multiprocessing
def dump_dict(queue, size):
queue.put({x: x for x in range(size)})
print("Dump finished")
if __name__ == '__main__':
SIZE = int(1e5)
queue = multiprocessing.Queue()
process = multiprocessing.Process(target=dump_dict, args=(queue, SIZE))
print("Starting...")
process.start()
print("Joining...")
process.join()
print("Done")
print(len(queue.get()))
如果 SIZE
参数足够小(<= 1e4 至少在我的例子中),整个程序运行顺利没有问题,但是一旦 SIZE
足够大,该程序挂在奇怪的地方。现在,在搜索解释时,即 python multiprocessing - process hangs on join for large queue,我总是看到 "you need to consume from the queue" 的一般答案。但是看起来奇怪的是程序实际上打印了 Dump finished
即在将对象放入 queue
之后到达代码行。此外,使用 Queue.put_nowait
而不是 Queue.put
并没有什么不同。
最后,如果您使用 Process.join(1)
而不是 Process.join()
,整个过程将完成 ,队列中有完整的字典 (即 print(len(..))
行将打印 10000
).
有人可以让我对此有更多了解吗?
您需要在 process.join()
之前的 parent 中 queue.get()
以防止出现死锁。队列已经生成了一个 feeder-thread,它的第一个 queue.put()
和 worker-process 中的 MainThread
在退出之前加入了这个 feeder-thread。所以 worker-process 在结果完全刷新到 (OS-pipe-) 缓冲区之前不会退出,但是你的结果太大而无法放入缓冲区并且你的 parent 没有读取从队列中直到工人退出,导致死锁。
您看到 print("Dump finished")
的输出,因为实际发送发生在 feeder-thread,queue.put()
本身只是附加到 worker-process 中的 collections.deque
作为中间步骤。
面对类似的问题,我使用@Darkonaut 的回答和以下实现解决了它:
import time
done = 0
while done < n: # n is the number of objects you expect to get
if not queue.empty():
done += 1
results = queue.get()
# Do something with the results
else:
time.sleep(.5)
感觉不是很 pythonic,但它起作用了!
当您向 multiprocessing.Queue
提供足够大的对象时,程序似乎会在奇怪的地方挂起。考虑这个最小的例子:
import multiprocessing
def dump_dict(queue, size):
queue.put({x: x for x in range(size)})
print("Dump finished")
if __name__ == '__main__':
SIZE = int(1e5)
queue = multiprocessing.Queue()
process = multiprocessing.Process(target=dump_dict, args=(queue, SIZE))
print("Starting...")
process.start()
print("Joining...")
process.join()
print("Done")
print(len(queue.get()))
如果 SIZE
参数足够小(<= 1e4 至少在我的例子中),整个程序运行顺利没有问题,但是一旦 SIZE
足够大,该程序挂在奇怪的地方。现在,在搜索解释时,即 python multiprocessing - process hangs on join for large queue,我总是看到 "you need to consume from the queue" 的一般答案。但是看起来奇怪的是程序实际上打印了 Dump finished
即在将对象放入 queue
之后到达代码行。此外,使用 Queue.put_nowait
而不是 Queue.put
并没有什么不同。
最后,如果您使用 Process.join(1)
而不是 Process.join()
,整个过程将完成 ,队列中有完整的字典 (即 print(len(..))
行将打印 10000
).
有人可以让我对此有更多了解吗?
您需要在 process.join()
之前的 parent 中 queue.get()
以防止出现死锁。队列已经生成了一个 feeder-thread,它的第一个 queue.put()
和 worker-process 中的 MainThread
在退出之前加入了这个 feeder-thread。所以 worker-process 在结果完全刷新到 (OS-pipe-) 缓冲区之前不会退出,但是你的结果太大而无法放入缓冲区并且你的 parent 没有读取从队列中直到工人退出,导致死锁。
您看到 print("Dump finished")
的输出,因为实际发送发生在 feeder-thread,queue.put()
本身只是附加到 worker-process 中的 collections.deque
作为中间步骤。
面对类似的问题,我使用@Darkonaut 的回答和以下实现解决了它:
import time
done = 0
while done < n: # n is the number of objects you expect to get
if not queue.empty():
done += 1
results = queue.get()
# Do something with the results
else:
time.sleep(.5)
感觉不是很 pythonic,但它起作用了!