多处理队列未加入
multiprocessing queue not joining
我正在使用多处理(和 pebble)来创建一个或多个生产者进程和一个或多个消费者进程。生产者将项目添加到队列中,消费者从队列中拉取,然后调用 task_done
。我的主函数调用队列的 join()
函数,该函数应该阻塞直到
all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue)
但是,即使对放入队列的每个项目都调用了 task_done
,连接也会永远阻塞。请参见下面的示例:
from functools import partial
import multiprocessing as mp
import pebble
import queue
import time
def add_to_queue(num, q):
# add num to the queue `q`
time.sleep(2) # pretend i'm doing work
print("putting on queue")
q.put(num)
print("put on queue done")
return num
def worker(q, output, done):
# continually pull items off the queue until the done event is set
while True:
if done.set():
return
try:
print("Getting from queue")
num = q.get(block=True, timeout=10)
print("Got from queue")
except queue.Empty:
print("EMPTY QUEUE")
# If i add this it errors saying "called to many times"
# q.task_done()
continue
time.sleep(num)
output.append(num)
# mark item as processed
q.task_done()
print("task done")
def main(n_checkers=1):
mgr = mp.Manager()
q = mgr.Queue()
output = mgr.list()
done = mgr.Event()
workers = []
add_partial = partial(add_to_queue, q=q)
with pebble.ProcessPool(n_checkers) as pool:
nums = [1, 2, 3, 4, 5]
map_future = pool.map(add_partial, nums)
for i in range(n_checkers):
print("SCHEDULING WORKER", i)
ftr = pool.schedule(worker, args=(q, output, done))
workers.append(ftr)
for r in map_future.result():
print(r)
print("Joining Queue")
# NEVER Joins even though we've called `task_done` for each input in `nums`
q.join()
done.set()
for w in workers:
w.result()
print(output)
if __name__ == "__main__":
main()
问题出在您调用 done.set
而不是 done.is_set()
的函数 worker
中:
def worker(q, output, done):
# continually pull items off the queue until the done event is set
while True:
#if done.set(): # oops!
if done.is_set():
但如果我可以发表评论并提出建议。
先评论:
你为什么要混合一个多处理池,它在下面使用 multiprocessing.Queue
或类似结构(我不熟悉 pebble
)来将由工作函数及其参数组成的任务传递给池进程,以及用于将工作传递给工作函数的托管队列?只有当我使用 multiprocessing.Process
个实例实现我自己的处理池时,我才会使用显式队列。
现在建议:
但如果您坚持,可以通过在输入队列中放置特殊的“文件末尾”记录来大大简化处理,每个检查器一个,向检查器发出信号,表明不会再放置更多记录队列和检查器现在可以终止。这消除了对 done
事件和 queue.get()
调用超时的需要。 “文件结尾”记录只需要与“正常”记录区分开来。在这种情况下 None
完美地达到了目的。由于我不熟悉 pebble
,因此已使用 multiprocessing.Pool
class 进行了测试。未经测试的 pebble
版本如下:
multiprocessing.Pool
版本
from functools import partial
import multiprocessing as mp
import queue
import time
def add_to_queue(num, q):
# add num to the queue `q`
time.sleep(2) # pretend i'm doing work
print("putting on queue")
q.put(num)
print("put on queue done")
return num
def worker(q, output):
# continually pull items off the queue until end of file:
while True:
print("Getting from queue")
num = q.get(block=True)
if num is None:
q.task_done()
print("task done")
break # end of file
print("Got from queue")
time.sleep(num)
output.append(num)
# mark item as processed
q.task_done()
print("task done")
def main(n_checkers=1):
mgr = mp.Manager()
q = mgr.Queue()
output = mgr.list()
workers = []
add_partial = partial(add_to_queue, q=q)
with mp.Pool(n_checkers) as pool:
nums = [1, 2, 3, 4, 5]
# add end of file indicators, one per checker:
nums += [None] * n_checkers
map_result = pool.map_async(add_partial, nums)
for i in range(n_checkers):
print("SCHEDULING WORKER", i)
ftr = pool.apply_async(worker, args=(q, output))
workers.append(ftr)
for r in map_result.get():
print(r)
print("Joining Queue")
# NEVER Joins even though we've called `task_done` for each input in `nums`
q.join()
for w in workers:
w.get()
print(output)
if __name__ == "__main__":
main()
pebble
版本
from functools import partial
import multiprocessing as mp
import pebble
import queue
import time
def add_to_queue(num, q):
# add num to the queue `q`
time.sleep(2) # pretend i'm doing work
print("putting on queue")
q.put(num)
print("put on queue done")
return num
def worker(q, output):
# continually pull items off the queue until end of file:
while True:
print("Getting from queue")
num = q.get()
if num is None: # end of file record
q.task_done()
print("task done")
break
print("Got from queue")
time.sleep(num)
output.append(num)
# mark item as processed
q.task_done()
print("task done")
def main(n_checkers=1):
mgr = mp.Manager()
q = mgr.Queue()
output = mgr.list()
workers = []
add_partial = partial(add_to_queue, q=q)
with pebble.ProcessPool(n_checkers) as pool:
nums = [1, 2, 3, 4, 5]
# add end of file records, one for each checker:
nums += [None] * n_checkers
map_future = pool.map(add_partial, nums)
for i in range(n_checkers):
print("SCHEDULING WORKER", i)
ftr = pool.schedule(worker, args=(q, output))
workers.append(ftr)
for r in map_future.result():
print(r)
print("Joining Queue")
q.join()
for w in workers:
w.result()
print(output)
if __name__ == "__main__":
main()
我正在使用多处理(和 pebble)来创建一个或多个生产者进程和一个或多个消费者进程。生产者将项目添加到队列中,消费者从队列中拉取,然后调用 task_done
。我的主函数调用队列的 join()
函数,该函数应该阻塞直到
all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue)
但是,即使对放入队列的每个项目都调用了 task_done
,连接也会永远阻塞。请参见下面的示例:
from functools import partial
import multiprocessing as mp
import pebble
import queue
import time
def add_to_queue(num, q):
# add num to the queue `q`
time.sleep(2) # pretend i'm doing work
print("putting on queue")
q.put(num)
print("put on queue done")
return num
def worker(q, output, done):
# continually pull items off the queue until the done event is set
while True:
if done.set():
return
try:
print("Getting from queue")
num = q.get(block=True, timeout=10)
print("Got from queue")
except queue.Empty:
print("EMPTY QUEUE")
# If i add this it errors saying "called to many times"
# q.task_done()
continue
time.sleep(num)
output.append(num)
# mark item as processed
q.task_done()
print("task done")
def main(n_checkers=1):
mgr = mp.Manager()
q = mgr.Queue()
output = mgr.list()
done = mgr.Event()
workers = []
add_partial = partial(add_to_queue, q=q)
with pebble.ProcessPool(n_checkers) as pool:
nums = [1, 2, 3, 4, 5]
map_future = pool.map(add_partial, nums)
for i in range(n_checkers):
print("SCHEDULING WORKER", i)
ftr = pool.schedule(worker, args=(q, output, done))
workers.append(ftr)
for r in map_future.result():
print(r)
print("Joining Queue")
# NEVER Joins even though we've called `task_done` for each input in `nums`
q.join()
done.set()
for w in workers:
w.result()
print(output)
if __name__ == "__main__":
main()
问题出在您调用 done.set
而不是 done.is_set()
的函数 worker
中:
def worker(q, output, done):
# continually pull items off the queue until the done event is set
while True:
#if done.set(): # oops!
if done.is_set():
但如果我可以发表评论并提出建议。
先评论:
你为什么要混合一个多处理池,它在下面使用 multiprocessing.Queue
或类似结构(我不熟悉 pebble
)来将由工作函数及其参数组成的任务传递给池进程,以及用于将工作传递给工作函数的托管队列?只有当我使用 multiprocessing.Process
个实例实现我自己的处理池时,我才会使用显式队列。
现在建议:
但如果您坚持,可以通过在输入队列中放置特殊的“文件末尾”记录来大大简化处理,每个检查器一个,向检查器发出信号,表明不会再放置更多记录队列和检查器现在可以终止。这消除了对 done
事件和 queue.get()
调用超时的需要。 “文件结尾”记录只需要与“正常”记录区分开来。在这种情况下 None
完美地达到了目的。由于我不熟悉 pebble
,因此已使用 multiprocessing.Pool
class 进行了测试。未经测试的 pebble
版本如下:
multiprocessing.Pool
版本
from functools import partial
import multiprocessing as mp
import queue
import time
def add_to_queue(num, q):
# add num to the queue `q`
time.sleep(2) # pretend i'm doing work
print("putting on queue")
q.put(num)
print("put on queue done")
return num
def worker(q, output):
# continually pull items off the queue until end of file:
while True:
print("Getting from queue")
num = q.get(block=True)
if num is None:
q.task_done()
print("task done")
break # end of file
print("Got from queue")
time.sleep(num)
output.append(num)
# mark item as processed
q.task_done()
print("task done")
def main(n_checkers=1):
mgr = mp.Manager()
q = mgr.Queue()
output = mgr.list()
workers = []
add_partial = partial(add_to_queue, q=q)
with mp.Pool(n_checkers) as pool:
nums = [1, 2, 3, 4, 5]
# add end of file indicators, one per checker:
nums += [None] * n_checkers
map_result = pool.map_async(add_partial, nums)
for i in range(n_checkers):
print("SCHEDULING WORKER", i)
ftr = pool.apply_async(worker, args=(q, output))
workers.append(ftr)
for r in map_result.get():
print(r)
print("Joining Queue")
# NEVER Joins even though we've called `task_done` for each input in `nums`
q.join()
for w in workers:
w.get()
print(output)
if __name__ == "__main__":
main()
pebble
版本
from functools import partial
import multiprocessing as mp
import pebble
import queue
import time
def add_to_queue(num, q):
# add num to the queue `q`
time.sleep(2) # pretend i'm doing work
print("putting on queue")
q.put(num)
print("put on queue done")
return num
def worker(q, output):
# continually pull items off the queue until end of file:
while True:
print("Getting from queue")
num = q.get()
if num is None: # end of file record
q.task_done()
print("task done")
break
print("Got from queue")
time.sleep(num)
output.append(num)
# mark item as processed
q.task_done()
print("task done")
def main(n_checkers=1):
mgr = mp.Manager()
q = mgr.Queue()
output = mgr.list()
workers = []
add_partial = partial(add_to_queue, q=q)
with pebble.ProcessPool(n_checkers) as pool:
nums = [1, 2, 3, 4, 5]
# add end of file records, one for each checker:
nums += [None] * n_checkers
map_future = pool.map(add_partial, nums)
for i in range(n_checkers):
print("SCHEDULING WORKER", i)
ftr = pool.schedule(worker, args=(q, output))
workers.append(ftr)
for r in map_future.result():
print(r)
print("Joining Queue")
q.join()
for w in workers:
w.result()
print(output)
if __name__ == "__main__":
main()