multiprocessing.Queue 作为 arg 到 pool worker 中止执行 worker

multiprocessing.Queue as arg to pool worker aborts execution of worker

我真的很难相信我已经 运行 解决了我遇到的问题,这似乎是 python 多处理模块中的一个大错误...无论如何,我 运行 遇到的问题是,每当我将 multiprocessing.Queue 作为参数传递给 multiprocessing.Pool 工作人员时,池工作人员永远不会执行其代码。即使是在 python docs.

中找到的示例代码的稍微修改版本的非常简单的测试中,我也能够重现此错误

这是队列示例代码的原始版本:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])


if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())  # prints "[42, None, 'hello']"
    p.join()

这是我对队列示例代码的修改版本:

from multiprocessing import Queue, Pool

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Pool(1)
    p.apply_async(f,args=(q,))
    print(q.get()) # prints "[42, None, 'hello']"
    p.close()
    p.join()

我所做的就是使 p 成为大小为 1 的进程池而不是 multiprocessing.Process 对象,结果是代码永远挂在 print 语句上,因为没有任何内容被写入队列!当然,我以其原始形式对其进行了测试,并且效果很好。我的 OS 是 windows 10,我的 python 版本是 3。5.x,有人知道为什么会这样吗?

更新:仍然不知道为什么这个示例代码适用于 multiprocessing.Process 而不是 multiprocessing.Pool 但我发现 work around 我很满意(Alex Martelli 的回答)。显然你可以制作一个 multiprocessing.Queue 的全局列表并传递每个进程和索引以供使用,我将避免使用托管队列,因为它们速度较慢。感谢客人给我看 link.

问题

当你调用apply_async it returns a AsyncResult object and leaves the workload distribution to a separate thread (see also this answer). This thread encounters the problem that the Queue object can't be pickled and therefore the requested work can't be distributed (and eventually executed). We can see this by calling AsyncResult.get时:

r = p.apply_async(f,args=(q,))
r.get()

这引发了 RuntimeError:

RuntimeError: Queue objects should only be shared between processes through inheritance

然而,这个 RuntimeError 只有在您请求结果后才会在主线程中引发,因为它实际上发生在不同的线程中(因此需要一种传输方式)。

那么当你这样做时会发生什么

p.apply_async(f,args=(q,))

是目标函数 f 从未被调用,因为它的参数之一 (q) 无法被 pickle。因此 q 永远不会收到一个项目并保持为空,因此在主线程中对 q.get 的调用将永远阻塞。

解决方案

使用 apply_async,您不必手动管理结果队列,但它们很容易以 AsyncResult 对象的形式提供给您。因此,您可以将代码修改为简单地 return 来自目标函数:

from multiprocessing import Queue, Pool

def f():
    return [42, None, 'hello']

if __name__ == '__main__':
    q = Queue()
    p = Pool(1)
    result = p.apply_async(f)
    print(result.get())