如何使用 Queue 与 Python 进行多处理?
How to use Queue for multiprocessing with Python?
这个程序工作正常,它应该输出:0 1 2 3。
from multiprocessing import Process, Queue
NTHREADS = 4
def foo(queue, id):
queue.put(id)
if __name__ == '__main__':
queue = Queue()
procs = []
for id in range(NTHREADS):
procs.append(Process(target=foo, args=(queue, id)))
for proc in procs:
proc.start()
for proc in procs:
proc.join()
while not queue.empty():
print(queue.get())
但这个不是。
我认为它在 join() 之后停止了。
from multiprocessing import Process, Queue
from PIL import Image
NTHREADS = 4
def foo(queue):
img = Image.new('RGB', (200,200), color=(255,0,0))
queue.put(img)
if __name__ == '__main__':
queue = Queue()
procs = []
for i in range(NTHREADS):
procs.append(Process(target=foo, args=(queue,)))
for proc in procs:
proc.start()
for proc in procs:
proc.join()
while not queue.empty():
print(queue.get().size)
为什么?我怎样才能到达终点?我怎样才能得到我的形象?
我想并行处理 4 张图像,然后将它们合并为一张最终图像。
队列是幕后的复杂野兽。当一个(pickle of an)对象被放入队列时,它的一部分被送入底层 OS 进程间通信机制,但其余部分留在内存中 Python 缓冲区,以避免淹没 OS 设施。内存缓冲区中的东西被送入 OS 机制,因为接收端通过从队列中取出东西来腾出更多空间。
结果是工作进程不能在其内存缓冲区(送入队列)为空之前结束。
在您的第一个程序中,整数的 pickle 非常小,以至于内存缓冲区无法发挥作用。一个工人把整个泡菜一口gulp喂给OS,然后工人就可以退出了。
但是在你的第二个程序中,泡菜要大得多。一个工人将一部分 pickle 发送到 OS,然后等待主程序将其从 OS 机制中取出,这样它就可以喂下一部分 pickle。由于您的程序在调用 .join()
之前不会从队列中取出任何东西,因此工作人员将永远等待。
所以,一般来说,这是规则:在所有队列都用完之前,永远不要尝试 .join()
。
请注意来自 the docs 的内容:
Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed.
另外,queue.empty()
是一个糟糕的测试方法。这只能告诉您数据恰好在执行时是否在队列中。在并行处理中,这充其量只是对事实的概率近似。在你的第二个例子中,你确切地知道你希望从队列中获得多少项目,所以这种方式是可靠的:
for proc in procs:
proc.start()
for i in range(NTHREADS):
print(queue.get().size)
for proc in procs: # join AFTER queue is drained
proc.join()
这个程序工作正常,它应该输出:0 1 2 3。
from multiprocessing import Process, Queue
NTHREADS = 4
def foo(queue, id):
queue.put(id)
if __name__ == '__main__':
queue = Queue()
procs = []
for id in range(NTHREADS):
procs.append(Process(target=foo, args=(queue, id)))
for proc in procs:
proc.start()
for proc in procs:
proc.join()
while not queue.empty():
print(queue.get())
但这个不是。 我认为它在 join() 之后停止了。
from multiprocessing import Process, Queue
from PIL import Image
NTHREADS = 4
def foo(queue):
img = Image.new('RGB', (200,200), color=(255,0,0))
queue.put(img)
if __name__ == '__main__':
queue = Queue()
procs = []
for i in range(NTHREADS):
procs.append(Process(target=foo, args=(queue,)))
for proc in procs:
proc.start()
for proc in procs:
proc.join()
while not queue.empty():
print(queue.get().size)
为什么?我怎样才能到达终点?我怎样才能得到我的形象? 我想并行处理 4 张图像,然后将它们合并为一张最终图像。
队列是幕后的复杂野兽。当一个(pickle of an)对象被放入队列时,它的一部分被送入底层 OS 进程间通信机制,但其余部分留在内存中 Python 缓冲区,以避免淹没 OS 设施。内存缓冲区中的东西被送入 OS 机制,因为接收端通过从队列中取出东西来腾出更多空间。
结果是工作进程不能在其内存缓冲区(送入队列)为空之前结束。
在您的第一个程序中,整数的 pickle 非常小,以至于内存缓冲区无法发挥作用。一个工人把整个泡菜一口gulp喂给OS,然后工人就可以退出了。
但是在你的第二个程序中,泡菜要大得多。一个工人将一部分 pickle 发送到 OS,然后等待主程序将其从 OS 机制中取出,这样它就可以喂下一部分 pickle。由于您的程序在调用 .join()
之前不会从队列中取出任何东西,因此工作人员将永远等待。
所以,一般来说,这是规则:在所有队列都用完之前,永远不要尝试 .join()
。
请注意来自 the docs 的内容:
Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed.
另外,queue.empty()
是一个糟糕的测试方法。这只能告诉您数据恰好在执行时是否在队列中。在并行处理中,这充其量只是对事实的概率近似。在你的第二个例子中,你确切地知道你希望从队列中获得多少项目,所以这种方式是可靠的:
for proc in procs:
proc.start()
for i in range(NTHREADS):
print(queue.get().size)
for proc in procs: # join AFTER queue is drained
proc.join()