流程完成但无法加入?
Process finishes but cannot be joined?
为了加速某项任务,我正在 classing Process
创建一个 worker,它将 处理 样本中的数据。一些管理 class 将向其提供数据并读取输出(使用两个 Queue
实例)。对于异步操作,我使用 put_nowait
和 get_nowait
。最后,我向我的进程发送了一个特殊的退出代码,它打破了它的内部循环。然而……它永远不会发生。这是一个最小的可重现示例:
import multiprocessing as mp
class Worker(mp.Process):
def __init__(self, in_queue, out_queue):
super(Worker, self).__init__()
self.input_queue = in_queue
self.output_queue = out_queue
def run(self):
while True:
received = self.input_queue.get(block=True)
if received is None:
break
self.output_queue.put_nowait(received)
print("\tWORKER DEAD")
class Processor():
def __init__(self):
# prepare
in_queue = mp.Queue()
out_queue = mp.Queue()
worker = Worker(in_queue, out_queue)
# get to work
worker.start()
in_queue.put_nowait(list(range(10**5))) # XXX
# clean up
print("NOTIFYING")
in_queue.put_nowait(None)
#out_queue.get() # XXX
print("JOINING")
worker.join()
Processor()
此代码永远不会完成,像这样永久挂起:
NOTIFYING
JOINING
WORKER DEAD
为什么?
我用 XXX
标记了两行。在第一个中,如果我发送较少的数据(比如 10**4
),一切都会正常完成(进程按预期加入)。同样在第二个,如果我get()
通知工人完成后。我知道我遗漏了一些东西,但 documentation 中似乎没有任何相关内容。
文档提到
When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences [...] After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising queue.Empty.
https://docs.python.org/3.7/library/multiprocessing.html#pipes-and-queues
此外
whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate.
https://docs.python.org/3.7/library/multiprocessing.html#multiprocessing-programming
这意味着您所描述的行为可能是由工作程序 self.output_queue.put_nowait(received)
与处理器 __init__
中的 worker.join()
加入工作程序之间的竞争条件引起的。如果加入比将其放入队列更快,那么一切都很好。如果太慢,队列中有项目,工作人员不会加入。
在主进程中取消对 out_queue.get()
的注释将清空队列,从而允许加入。但是,如果队列已经为空,则队列 return 很重要,因此使用 time-out 可能是尝试等待比赛条件结束的一种选择,例如 out_qeue.get(timeout=10)
.
可能重要的也可能是保护主例程,特别是对于 Windows (python multiprocessing on windows, if __name__ == "__main__")
为了加速某项任务,我正在 classing Process
创建一个 worker,它将 处理 样本中的数据。一些管理 class 将向其提供数据并读取输出(使用两个 Queue
实例)。对于异步操作,我使用 put_nowait
和 get_nowait
。最后,我向我的进程发送了一个特殊的退出代码,它打破了它的内部循环。然而……它永远不会发生。这是一个最小的可重现示例:
import multiprocessing as mp
class Worker(mp.Process):
def __init__(self, in_queue, out_queue):
super(Worker, self).__init__()
self.input_queue = in_queue
self.output_queue = out_queue
def run(self):
while True:
received = self.input_queue.get(block=True)
if received is None:
break
self.output_queue.put_nowait(received)
print("\tWORKER DEAD")
class Processor():
def __init__(self):
# prepare
in_queue = mp.Queue()
out_queue = mp.Queue()
worker = Worker(in_queue, out_queue)
# get to work
worker.start()
in_queue.put_nowait(list(range(10**5))) # XXX
# clean up
print("NOTIFYING")
in_queue.put_nowait(None)
#out_queue.get() # XXX
print("JOINING")
worker.join()
Processor()
此代码永远不会完成,像这样永久挂起:
NOTIFYING
JOINING
WORKER DEAD
为什么?
我用 XXX
标记了两行。在第一个中,如果我发送较少的数据(比如 10**4
),一切都会正常完成(进程按预期加入)。同样在第二个,如果我get()
通知工人完成后。我知道我遗漏了一些东西,但 documentation 中似乎没有任何相关内容。
文档提到
When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences [...] After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising queue.Empty.
https://docs.python.org/3.7/library/multiprocessing.html#pipes-and-queues
此外
whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate.
https://docs.python.org/3.7/library/multiprocessing.html#multiprocessing-programming
这意味着您所描述的行为可能是由工作程序 self.output_queue.put_nowait(received)
与处理器 __init__
中的 worker.join()
加入工作程序之间的竞争条件引起的。如果加入比将其放入队列更快,那么一切都很好。如果太慢,队列中有项目,工作人员不会加入。
在主进程中取消对 out_queue.get()
的注释将清空队列,从而允许加入。但是,如果队列已经为空,则队列 return 很重要,因此使用 time-out 可能是尝试等待比赛条件结束的一种选择,例如 out_qeue.get(timeout=10)
.
可能重要的也可能是保护主例程,特别是对于 Windows (python multiprocessing on windows, if __name__ == "__main__")