Python 多处理 - 'Queue' 对象没有属性 'task_done' / 'join'

Python Multiprocessing - 'Queue' object has no attribute 'task_done' / 'join'

我正在将线程进程重写为多处理队列,以尝试加速大型计算。我已经完成了 95% 的进度,但是我不知道如何在 Queue 为空时使用 multiprocessing.

发出信号

我原来的代码是这样的:

import Queue
from threading import Thread

num_fetch_threads = 4
enclosure_queue = Queue()

for i in range(num_fetch_threads):
  worker = Thread(target=run_experiment, args=(i, enclosure_queue))
  worker.setDaemon(True)
  worker.start()

for experiment in experiment_collection:
  enclosure_queue.put((experiment, otherVar))

enclosure_queue.join()

队列函数是这样的:

def run_experiment(i, q):
  while True:
    ... do stuff ...
    q.task_done()

我的新代码是这样的:

from multiprocessing import Process, Queue

num_fetch_threads = 4
enclosure_queue = Queue()

for i in range(num_fetch_threads):
  worker = Process(target=run_experiment, args=(i, enclosure_queue))
  worker.daemon = True
  worker.start()

for experiment in experiment_collection:
  enclosure_queue.put((experiment, otherVar))

worker.join() ## I only put this here bc enclosure_queue.join() is not available

以及新队列函数:

def run_experiment(i, q):
  while True:
    ... do stuff ...
    ## not sure what should go here

我一直在阅读文档和 Google,但无法弄清楚我遗漏了什么 - 我知道 task_done / join 不属于 multiprocessing Queue class,但不清楚我应该使用什么。

"They differ in that Queue lacks the task_done() and join() methods introduced into Python 2.5’s Queue.Queue class." Source

但如果没有其中任何一个,我不确定队列如何知道它已完成,以及如何继续执行该程序。

考虑使用 multiprocessing.Pool 而不是手动管理工作人员。 Pool 处理对 worker 的调度任务,具有 map 和 apply 等方便的功能,并支持 .close.join 方法。 Pool 负责处理进程之间的队列并处理结果。以下是您的代码使用 multiprocessing.Pool:

时的样子
from multiprocessing import Pool

def do_experiment(exp):
    # run the experiment `exp`, will be called by `p.map`
    return result

p = Pool() # automatically scales to the number of CPUs available

results = p.map(do_experiment, experiment_collection)
p.close()
p.join()