如何判断 apply_async 函数是否已启动或它是否仍在 multiprocessing.Pool 队列中

How to tell if an apply_async function has started or if it's still in the queue with multiprocessing.Pool

我正在使用 python 的 multiprocessing.Pool 和 apply_async 来调用一堆函数。

如何判断某个函数是否已开始由池中的成员处理,或者它是否在队列中?

例如:

import multiprocessing
import time

def func(t):
    #take some time processing
    print 'func({}) started'.format(t)
    time.sleep(t)

pool = multiprocessing.Pool()

results = [pool.apply_async(func, [t]) for t in [100]*50] #adds 50 func calls to the queue

对于 results 中的每个 AsyncResult,您可以调用 ready()get(0) 来查看函数是否完成 运行。但是你怎么知道func started但是还没有结束呢?

即对于给定的 AsyncResult 对象(即给定的结果元素),有没有办法查看该函数是否已被调用或者它是否位于池的队列中?

首先,从结果列表中删除已完成的作业

    results = [r for r in results if not r.ready()]

待处理的进程数是结果列表的长度:

    pending = len(results)

待定但未开始的总数为待定 - pool_size

    not_started = pending - pool_size

pool_size 将是 multiprocessing.cpu_count() 如果像您一样使用默认参数创建 Pool

更新: 在最初误解了这个问题之后,这里有一种方法可以解决 OP 所询问的问题。

我怀疑可以将此功能添加到 Pool class 中而不会有太多麻烦,因为 AsyncResult 是由带有 Queue 的 Pool 实现的。该队列也可以在内部使用以指示是否已启动。

但这里有一种使用 Pool 和 Pipe 来实现的方法。注意:这在 Python 2.x 中不起作用——不知道为什么。在 Python 3.8.

中测试
import multiprocessing
import time
import os

def worker_function(pipe):
    pipe.send('started')
    print('[{}] started pipe={}'.format(os.getpid(), pipe))
    time.sleep(3)
    pipe.close()

def test():
    pool = multiprocessing.Pool(processes=2)
    print('[{}] pool={}'.format(os.getpid(), pool))

    workers = []

    for x in range(1, 4):
        parent, child = multiprocessing.Pipe()
        pool.apply_async(worker_function, (child,))
        worker = {'name': 'worker{}'.format(x), 'pipe': parent, 'started': False}
        workers.append(worker)

    pool.close()

    while True:
        for worker in workers:
            if worker.get('started'):
                continue
            pipe = worker.get('pipe')
            if pipe.poll(0.1):
                message = pipe.recv()
                print('[{}] {} says {}'.format(os.getpid(), worker.get('name'), message))
                worker['started'] = True
                pipe.close()
        count_in_queue = len(workers)
        for worker in workers:
            if worker.get('started'):
                count_in_queue -= 1
        print('[{}] count_in_queue = {}'.format(os.getpid(), count_in_queue))
        if not count_in_queue:
            break
        time.sleep(0.5)

    pool.join()

if __name__ == '__main__':
    test()