Python 多处理 - 'Queue' 对象没有属性 'task_done' / 'join'
Python Multiprocessing - 'Queue' object has no attribute 'task_done' / 'join'
我正在将线程进程重写为多处理队列,以尝试加速大型计算。我已经完成了 95% 的进度,但是我不知道如何在 Queue
为空时使用 multiprocessing
.
发出信号
我原来的代码是这样的:
import Queue
from threading import Thread
num_fetch_threads = 4
enclosure_queue = Queue()
for i in range(num_fetch_threads):
worker = Thread(target=run_experiment, args=(i, enclosure_queue))
worker.setDaemon(True)
worker.start()
for experiment in experiment_collection:
enclosure_queue.put((experiment, otherVar))
enclosure_queue.join()
队列函数是这样的:
def run_experiment(i, q):
while True:
... do stuff ...
q.task_done()
我的新代码是这样的:
from multiprocessing import Process, Queue
num_fetch_threads = 4
enclosure_queue = Queue()
for i in range(num_fetch_threads):
worker = Process(target=run_experiment, args=(i, enclosure_queue))
worker.daemon = True
worker.start()
for experiment in experiment_collection:
enclosure_queue.put((experiment, otherVar))
worker.join() ## I only put this here bc enclosure_queue.join() is not available
以及新队列函数:
def run_experiment(i, q):
while True:
... do stuff ...
## not sure what should go here
我一直在阅读文档和 Google,但无法弄清楚我遗漏了什么 - 我知道 task_done
/ join
不属于 multiprocessing
Queue
class,但不清楚我应该使用什么。
"They differ in that Queue lacks the task_done() and join() methods
introduced into Python 2.5’s Queue.Queue class." Source
但如果没有其中任何一个,我不确定队列如何知道它已完成,以及如何继续执行该程序。
考虑使用 multiprocessing.Pool
而不是手动管理工作人员。 Pool 处理对 worker 的调度任务,具有 map 和 apply 等方便的功能,并支持 .close
和 .join
方法。 Pool
负责处理进程之间的队列并处理结果。以下是您的代码使用 multiprocessing.Pool
:
时的样子
from multiprocessing import Pool
def do_experiment(exp):
# run the experiment `exp`, will be called by `p.map`
return result
p = Pool() # automatically scales to the number of CPUs available
results = p.map(do_experiment, experiment_collection)
p.close()
p.join()
我正在将线程进程重写为多处理队列,以尝试加速大型计算。我已经完成了 95% 的进度,但是我不知道如何在 Queue
为空时使用 multiprocessing
.
我原来的代码是这样的:
import Queue
from threading import Thread
num_fetch_threads = 4
enclosure_queue = Queue()
for i in range(num_fetch_threads):
worker = Thread(target=run_experiment, args=(i, enclosure_queue))
worker.setDaemon(True)
worker.start()
for experiment in experiment_collection:
enclosure_queue.put((experiment, otherVar))
enclosure_queue.join()
队列函数是这样的:
def run_experiment(i, q):
while True:
... do stuff ...
q.task_done()
我的新代码是这样的:
from multiprocessing import Process, Queue
num_fetch_threads = 4
enclosure_queue = Queue()
for i in range(num_fetch_threads):
worker = Process(target=run_experiment, args=(i, enclosure_queue))
worker.daemon = True
worker.start()
for experiment in experiment_collection:
enclosure_queue.put((experiment, otherVar))
worker.join() ## I only put this here bc enclosure_queue.join() is not available
以及新队列函数:
def run_experiment(i, q):
while True:
... do stuff ...
## not sure what should go here
我一直在阅读文档和 Google,但无法弄清楚我遗漏了什么 - 我知道 task_done
/ join
不属于 multiprocessing
Queue
class,但不清楚我应该使用什么。
"They differ in that Queue lacks the task_done() and join() methods introduced into Python 2.5’s Queue.Queue class." Source
但如果没有其中任何一个,我不确定队列如何知道它已完成,以及如何继续执行该程序。
考虑使用 multiprocessing.Pool
而不是手动管理工作人员。 Pool 处理对 worker 的调度任务,具有 map 和 apply 等方便的功能,并支持 .close
和 .join
方法。 Pool
负责处理进程之间的队列并处理结果。以下是您的代码使用 multiprocessing.Pool
:
from multiprocessing import Pool
def do_experiment(exp):
# run the experiment `exp`, will be called by `p.map`
return result
p = Pool() # automatically scales to the number of CPUs available
results = p.map(do_experiment, experiment_collection)
p.close()
p.join()