多处理队列未加入

multiprocessing queue not joining

我正在使用多处理(和 pebble)来创建一个或多个生产者进程和一个或多个消费者进程。生产者将项目添加到队列中,消费者从队列中拉取,然后调用 task_done。我的主函数调用队列的 join() 函数,该函数应该阻塞直到

all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue)

但是,即使对放入队列的每个项目都调用了 task_done,连接也会永远阻塞。请参见下面的示例:

from functools import partial
import multiprocessing as mp
import pebble
import queue
import time    

def add_to_queue(num, q):
    # add num to the queue `q`
    time.sleep(2) # pretend i'm doing work
    print("putting on queue")
    q.put(num)
    print("put on queue done")
    return num    

def worker(q, output, done):
    # continually pull items off the queue until the done event is set
    while True:
        if done.set():
            return
        try:
            print("Getting from queue")
            num = q.get(block=True, timeout=10)
            print("Got from queue")
        except queue.Empty:
            print("EMPTY QUEUE")
            # If i add this it errors saying "called to many times"
            # q.task_done() 
            continue
        time.sleep(num)
        output.append(num)
        # mark item as processed
        q.task_done()
        print("task done")    

def main(n_checkers=1):
    mgr = mp.Manager()
    q = mgr.Queue()
    output = mgr.list()
    done = mgr.Event()
    workers = []    

    add_partial = partial(add_to_queue, q=q)
    with pebble.ProcessPool(n_checkers) as pool:
        nums = [1, 2, 3, 4, 5]
        map_future = pool.map(add_partial, nums)
        for i in range(n_checkers):
            print("SCHEDULING WORKER", i)
            ftr = pool.schedule(worker, args=(q, output, done))
            workers.append(ftr)
    
        for r in map_future.result():
            print(r)
    
        print("Joining Queue")
        # NEVER Joins even though we've called `task_done` for each input in `nums`
        q.join()
        done.set()
        for w in workers:
            w.result()
        
    print(output)
    
        
if __name__ == "__main__":
    main()

问题出在您调用 done.set 而不是 done.is_set() 的函数 worker 中:

def worker(q, output, done):
    # continually pull items off the queue until the done event is set
    while True:
        #if done.set(): # oops!
        if done.is_set():

但如果我可以发表评论并提出建议。

先评论:

你为什么要混合一个多处理池,它在下面使用 multiprocessing.Queue 或类似结构(我不熟悉 pebble)来将由工作函数及其参数组成的任务传递给池进程,以及用于将工作传递给工作函数的托管队列?只有当我使用 multiprocessing.Process 个实例实现我自己的处理池时,我才会使用显式队列。

现在建议:

但如果您坚持,可以通过在输入队列中放置特殊的“文件末尾”记录来大大简化处理,每个检查器一个,向检查器发出信号,表明不会再放置更多记录队列和检查器现在可以终止。这消除了对 done 事件和 queue.get() 调用超时的需要。 “文件结尾”记录只需要与“正常”记录区分开来。在这种情况下 None 完美地达到了目的。由于我不熟悉 pebble,因此已使用 multiprocessing.Pool class 进行了测试。未经测试的 pebble 版本如下:

multiprocessing.Pool版本

from functools import partial
import multiprocessing as mp
import queue
import time

def add_to_queue(num, q):
    # add num to the queue `q`
    time.sleep(2) # pretend i'm doing work
    print("putting on queue")
    q.put(num)
    print("put on queue done")
    return num

def worker(q, output):
    # continually pull items off the queue until end of file:
    while True:
        print("Getting from queue")
        num = q.get(block=True)
        if num is None:
            q.task_done()
            print("task done")
            break # end of file
        print("Got from queue")
        time.sleep(num)
        output.append(num)
        # mark item as processed
        q.task_done()
        print("task done")

def main(n_checkers=1):
    mgr = mp.Manager()
    q = mgr.Queue()
    output = mgr.list()
    workers = []

    add_partial = partial(add_to_queue, q=q)
    with mp.Pool(n_checkers) as pool:
        nums = [1, 2, 3, 4, 5]
        # add end of file indicators, one per checker:
        nums += [None] * n_checkers
        map_result = pool.map_async(add_partial, nums)
        for i in range(n_checkers):
            print("SCHEDULING WORKER", i)
            ftr = pool.apply_async(worker, args=(q, output))
            workers.append(ftr)

        for r in map_result.get():
            print(r)

        print("Joining Queue")
        # NEVER Joins even though we've called `task_done` for each input in `nums`
        q.join()
        for w in workers:
            w.get()

    print(output)


if __name__ == "__main__":
    main()

pebble版本

from functools import partial
import multiprocessing as mp
import pebble
import queue
import time    

def add_to_queue(num, q):
    # add num to the queue `q`
    time.sleep(2) # pretend i'm doing work
    print("putting on queue")
    q.put(num)
    print("put on queue done")
    return num    

def worker(q, output):
    # continually pull items off the queue until end of file:
    while True:
        print("Getting from queue")
        num = q.get()
        if num is None: # end of file record
            q.task_done()
            print("task done")
            break
        print("Got from queue")
        time.sleep(num)
        output.append(num)
        # mark item as processed
        q.task_done()
        print("task done")    

def main(n_checkers=1):
    mgr = mp.Manager()
    q = mgr.Queue()
    output = mgr.list()
    workers = []    

    add_partial = partial(add_to_queue, q=q)
    with pebble.ProcessPool(n_checkers) as pool:
        nums = [1, 2, 3, 4, 5]
        # add end of file records, one for each checker:
        nums += [None] * n_checkers
        map_future = pool.map(add_partial, nums)
        for i in range(n_checkers):
            print("SCHEDULING WORKER", i)
            ftr = pool.schedule(worker, args=(q, output))
            workers.append(ftr)
    
        for r in map_future.result():
            print(r)
    
        print("Joining Queue")
        q.join()
        for w in workers:
            w.result()
        
    print(output)
    
        
if __name__ == "__main__":
    main()