如何使用 Queue 与 Python 进行多处理?

How to use Queue for multiprocessing with Python?

这个程序工作正常,它应该输出:0 1 2 3。

from multiprocessing import Process, Queue

NTHREADS = 4

def foo(queue, id):
    queue.put(id)

if __name__ == '__main__':
    queue = Queue()
    procs = []
    for id in range(NTHREADS):
        procs.append(Process(target=foo, args=(queue, id)))

    for proc in procs:
        proc.start()

    for proc in procs:
        proc.join()

    while not queue.empty():
        print(queue.get())

但这个不是。 我认为它在 join() 之后停止了。

from multiprocessing import Process, Queue
from PIL import Image

NTHREADS = 4

def foo(queue):
    img = Image.new('RGB', (200,200), color=(255,0,0))
    queue.put(img)

if __name__ == '__main__':
    queue = Queue()
    procs = []
    for i in range(NTHREADS):
        procs.append(Process(target=foo, args=(queue,)))

    for proc in procs:
        proc.start()

    for proc in procs:
        proc.join()

    while not queue.empty():
        print(queue.get().size)

为什么?我怎样才能到达终点?我怎样才能得到我的形象? 我想并行处理 4 张图像,然后将它们合并为一张最终图像。

队列是幕后的复杂野兽。当一个(pickle of an)对象被放入队列时,它的一部分被送入底层 OS 进程间通信机制,但其余部分留在内存中 Python 缓冲区,以避免淹没 OS 设施。内存缓冲区中的东西被送入 OS 机制,因为接收端通过从队列中取出东西来腾出更多空间。

结果是工作进程不能在其内存缓冲区(送入队列)为空之前结束。

在您的第一个程序中,整数的 pickle 非常小,以至于内存缓冲区无法发挥作用。一个工人把整个泡菜一口gulp喂给OS,然后工人就可以退出了。

但是在你的第二个程序中,泡菜要大得多。一个工人将一部分 pickle 发送到 OS,然后等待主程序将其从 OS 机制中取出,这样它就可以喂下一部分 pickle。由于您的程序在调用 .join() 之前不会从队列中取出任何东西,因此工作人员将永远等待。

所以,一般来说,这是规则:在所有队列都用完之前,永远不要尝试 .join()

请注意来自 the docs 的内容:

Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed.

另外,queue.empty() 是一个糟糕的测试方法。这只能告诉您数据恰好在执行时是否在队列中。在并行处理中,这充其量只是对事实的概率近似。在你的第二个例子中,你确切地知道你希望从队列中获得多少项目,所以这种方式是可靠的:

for proc in procs:
    proc.start()

for i in range(NTHREADS):
    print(queue.get().size)

for proc in procs: # join AFTER queue is drained
    proc.join()