Multiprocess.Process 未退出空队列

Multiprocess.Process not exiting on empty Queue

我有大量对象要迭代,我认为多处理会大大加快工作速度。但是,一旦我增加核心数,这个简单的示例似乎就会挂起。

它挂在 p.join() 行,如果我终止并检查,q_in.empty() returns True 并且输出队列有适当数量的项目。

是什么导致它挂起?

from multiprocessing import Process, Queue
import time

def worker_func(q_in, q_out, w):
    time.sleep(1)
    while not q_in.empty():
        # Simple code standing in for more complex operation
         q_out.put(str(w) + '_' + str(q_in.get()))

def setup_func(x):
    q_in = Queue()
    for w in range(x):
        q_in.put(w)

    q_out = Queue()
    return((q_in, q_out))

def test_func(num_cores, q_in, q_out):
    processes = []

    for w in range(num_cores):
        p = Process(target=worker_func, args=(q_in, q_out, w))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    output_ls = []
    while not q_out.empty():
        output_ls.append(q_out.get())

    return(output_ls)

q_in, q_out = setup_func(1000)
test_func(1, q_in, q_out) # This returns without issue for num_cores = 1 or 2

q_in, q_out = setup_func(1000)
test_func(5, q_in, q_out) # This hangs for num_cores = 5

您有多个进程从队列中拉取。队列可能有数据,但当您开始获取数据时,另一个进程已经使用了它。 multiprocessing.Queue.empty 由于 multithreading/multiprocessing 语义,这不可靠。

另一种方法是在队列的末尾放置一个进程结束哨兵,每个进程一个。当进程看到哨兵时,它就会退出。对于您的情况,None 是一个不错的选择。

from multiprocessing import Process, Queue
import time

def worker_func(q_in, q_out, w):
    time.sleep(1)
    while True:
        msg = q_in.get()
        if msg is None:
            break
        q_out.put(str(w) + '_' + str(msg))

def setup_func(x):
    q_in = Queue()
    for w in range(x):
        q_in.put(w)

    q_out = Queue()
    return((q_in, q_out))

def test_func(num_cores, q_in, q_out):
    processes = []

    for w in range(num_cores):
        q_in.put(None)
        p = Process(target=worker_func, args=(q_in, q_out, w))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    output_ls = []
    while not q_out.empty():
        output_ls.append(q_out.get())

    return(output_ls)

q_in, q_out = setup_func(1000)
test_func(1, q_in, q_out) # This returns without issue for num_cores = 1 or 2

q_in, q_out = setup_func(1000)
test_func(5, q_in, q_out) # This hangs for num_cores = 5