Python 多进程从队列中获取结果

Python multiprocess get result from queue

我是 运行 一个多处理脚本,它应该在大约 0.01 秒内启动 2.000.000 个作业。每个作业将结果放入从 Queue 导入的队列中,因为来自 Multiprocessing 模块的队列无法处理其中超过 517 个结果。

我的程序在从队列中获取结果之前冻结。这是我的多进程函数的核心:

while argslist != []:
    p = mp.Process(target=function, args=(result_queue, argslist.pop(),))
    jobs.append(p)
    p.start()
for p in jobs:
    p.join()
print 'over'

res = [result_queue.get() for p in jobs]
print 'got it'

输出:"over" 但永远不会 "got it"

当我替换

result_queue.get() 

来自

result_queue.get_nowait()

我收到 raise Empty 错误,说我的队列是空的...

但是如果我在我的内部函数中的 queue.put() 之后执行 queue.get() ,那么它就可以工作,向我表明我的函数很好地归档了队列..

queue.Queue 不在进程之间共享,因此它无法使用,您必须使用 multiprocessing.Queue.

为避免死锁,您不应在从队列中获取结果之前加入进程。 multiprocessing.Queue 受到其底层管道缓冲区的有效限制,因此如果填满,则无法将更多项目刷新到管道并且 queue.put() 将阻塞,直到消费者调用 queue.get(),但如果消费者正在加入一个阻塞的进程,那么你就有了死锁。

您可以通过使用 multiprocessing.Pool 及其 map() 来避免所有这些。

谢谢 mata,我切换回了 multiprocessing.Queue(),但我不想使用池,因为我想跟踪有多少作业完成了 运行。我终于添加了一个 if 语句来定期清空我的队列。

def multiprocess(function, argslist, ncpu):
    total = len(argslist)
    done = 0
    result_queue = mp.Queue(0)
    jobs = []
    res = []
    while argslist != []:
        if len(mp.active_children()) < ncpu:
            p = mp.Process(target=function, args=(result_queue, argslist.pop(),))
            jobs.append(p)
            p.start()
            done += 1
            print "\r",float(done)/total*100,"%", #here is to keep track
        # here comes my emptying step
        if len(jobs) == 500:
            tmp = [result_queue.get() for p in jobs]
            for r in tmp:
                res.append(r)
            result_queue = mp.Queue(0)
            jobs = []

    tmp = [result_queue.get() for p in jobs]
    for r in tmp:
        res.append(r)
    return res

然后我想到了这个问题:
500 个作业的限制是因为 python 还是因为我的机器或系统?
如果在其他条件下使用我的多处理功能,这个阈值会不会有问题?