流程完成但无法加入?

Process finishes but cannot be joined?

为了加速某项任务,我正在 classing Process 创建一个 worker,它将 处理 样本中的数据。一些管理 class 将向其提供数据并读取输出(使用两个 Queue 实例)。对于异步操作,我使用 put_nowaitget_nowait。最后,我向我的进程发送了一个特殊的退出代码,它打破了它的内部循环。然而……它永远不会发生。这是一个最小的可重现示例:

import multiprocessing as mp

class Worker(mp.Process):
  def __init__(self, in_queue, out_queue):
    super(Worker, self).__init__()
    self.input_queue = in_queue
    self.output_queue = out_queue

  def run(self):
    while True:
      received = self.input_queue.get(block=True)
      if received is None:
        break
      self.output_queue.put_nowait(received)
    print("\tWORKER DEAD")


class Processor():
  def __init__(self):
    # prepare
    in_queue = mp.Queue()
    out_queue = mp.Queue()
    worker = Worker(in_queue, out_queue)
    # get to work
    worker.start()
    in_queue.put_nowait(list(range(10**5))) # XXX
    # clean up
    print("NOTIFYING")
    in_queue.put_nowait(None)
    #out_queue.get() # XXX
    print("JOINING")
    worker.join()

Processor()

此代码永远不会完成,像这样永久挂起:

NOTIFYING
JOINING
    WORKER DEAD

为什么?

我用 XXX 标记了两行。在第一个中,如果我发送较少的数据(比如 10**4),一切都会正常完成(进程按预期加入)。同样在第二个,如果我get()通知工人完成后。我知道我遗漏了一些东西,但 documentation 中似乎没有任何相关内容。

文档提到

When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences [...] After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising queue.Empty.

https://docs.python.org/3.7/library/multiprocessing.html#pipes-and-queues

此外

whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate.

https://docs.python.org/3.7/library/multiprocessing.html#multiprocessing-programming

这意味着您所描述的行为可能是由工作程序 self.output_queue.put_nowait(received) 与处理器 __init__ 中的 worker.join() 加入工作程序之间的竞争条件引起的。如果加入比将其放入队列更快,那么一切都很好。如果太慢,队列中有项目,工作人员不会加入。

在主进程中取消对 out_queue.get() 的注释将清空队列,从而允许加入。但是,如果队列已经为空,则队列 return 很重要,因此使用 time-out 可能是尝试等待比赛条件结束的一种选择,例如 out_qeue.get(timeout=10).

可能重要的也可能是保护主例程,特别是对于 Windows (python multiprocessing on windows, if __name__ == "__main__")