如何判断 apply_async 函数是否已启动或它是否仍在 multiprocessing.Pool 队列中
How to tell if an apply_async function has started or if it's still in the queue with multiprocessing.Pool
我正在使用 python 的 multiprocessing.Pool 和 apply_async 来调用一堆函数。
如何判断某个函数是否已开始由池中的成员处理,或者它是否在队列中?
例如:
import multiprocessing
import time
def func(t):
#take some time processing
print 'func({}) started'.format(t)
time.sleep(t)
pool = multiprocessing.Pool()
results = [pool.apply_async(func, [t]) for t in [100]*50] #adds 50 func calls to the queue
对于 results
中的每个 AsyncResult
,您可以调用 ready()
或 get(0)
来查看函数是否完成 运行。但是你怎么知道func started但是还没有结束呢?
即对于给定的 AsyncResult 对象(即给定的结果元素),有没有办法查看该函数是否已被调用或者它是否位于池的队列中?
首先,从结果列表中删除已完成的作业
results = [r for r in results if not r.ready()]
待处理的进程数是结果列表的长度:
pending = len(results)
待定但未开始的总数为待定 - pool_size
not_started = pending - pool_size
pool_size 将是 multiprocessing.cpu_count() 如果像您一样使用默认参数创建 Pool
更新:
在最初误解了这个问题之后,这里有一种方法可以解决 OP 所询问的问题。
我怀疑可以将此功能添加到 Pool class 中而不会有太多麻烦,因为 AsyncResult 是由带有 Queue 的 Pool 实现的。该队列也可以在内部使用以指示是否已启动。
但这里有一种使用 Pool 和 Pipe 来实现的方法。注意:这在 Python 2.x 中不起作用——不知道为什么。在 Python 3.8.
中测试
import multiprocessing
import time
import os
def worker_function(pipe):
pipe.send('started')
print('[{}] started pipe={}'.format(os.getpid(), pipe))
time.sleep(3)
pipe.close()
def test():
pool = multiprocessing.Pool(processes=2)
print('[{}] pool={}'.format(os.getpid(), pool))
workers = []
for x in range(1, 4):
parent, child = multiprocessing.Pipe()
pool.apply_async(worker_function, (child,))
worker = {'name': 'worker{}'.format(x), 'pipe': parent, 'started': False}
workers.append(worker)
pool.close()
while True:
for worker in workers:
if worker.get('started'):
continue
pipe = worker.get('pipe')
if pipe.poll(0.1):
message = pipe.recv()
print('[{}] {} says {}'.format(os.getpid(), worker.get('name'), message))
worker['started'] = True
pipe.close()
count_in_queue = len(workers)
for worker in workers:
if worker.get('started'):
count_in_queue -= 1
print('[{}] count_in_queue = {}'.format(os.getpid(), count_in_queue))
if not count_in_queue:
break
time.sleep(0.5)
pool.join()
if __name__ == '__main__':
test()
我正在使用 python 的 multiprocessing.Pool 和 apply_async 来调用一堆函数。
如何判断某个函数是否已开始由池中的成员处理,或者它是否在队列中?
例如:
import multiprocessing
import time
def func(t):
#take some time processing
print 'func({}) started'.format(t)
time.sleep(t)
pool = multiprocessing.Pool()
results = [pool.apply_async(func, [t]) for t in [100]*50] #adds 50 func calls to the queue
对于 results
中的每个 AsyncResult
,您可以调用 ready()
或 get(0)
来查看函数是否完成 运行。但是你怎么知道func started但是还没有结束呢?
即对于给定的 AsyncResult 对象(即给定的结果元素),有没有办法查看该函数是否已被调用或者它是否位于池的队列中?
首先,从结果列表中删除已完成的作业
results = [r for r in results if not r.ready()]
待处理的进程数是结果列表的长度:
pending = len(results)
待定但未开始的总数为待定 - pool_size
not_started = pending - pool_size
pool_size 将是 multiprocessing.cpu_count() 如果像您一样使用默认参数创建 Pool
更新: 在最初误解了这个问题之后,这里有一种方法可以解决 OP 所询问的问题。
我怀疑可以将此功能添加到 Pool class 中而不会有太多麻烦,因为 AsyncResult 是由带有 Queue 的 Pool 实现的。该队列也可以在内部使用以指示是否已启动。
但这里有一种使用 Pool 和 Pipe 来实现的方法。注意:这在 Python 2.x 中不起作用——不知道为什么。在 Python 3.8.
中测试import multiprocessing
import time
import os
def worker_function(pipe):
pipe.send('started')
print('[{}] started pipe={}'.format(os.getpid(), pipe))
time.sleep(3)
pipe.close()
def test():
pool = multiprocessing.Pool(processes=2)
print('[{}] pool={}'.format(os.getpid(), pool))
workers = []
for x in range(1, 4):
parent, child = multiprocessing.Pipe()
pool.apply_async(worker_function, (child,))
worker = {'name': 'worker{}'.format(x), 'pipe': parent, 'started': False}
workers.append(worker)
pool.close()
while True:
for worker in workers:
if worker.get('started'):
continue
pipe = worker.get('pipe')
if pipe.poll(0.1):
message = pipe.recv()
print('[{}] {} says {}'.format(os.getpid(), worker.get('name'), message))
worker['started'] = True
pipe.close()
count_in_queue = len(workers)
for worker in workers:
if worker.get('started'):
count_in_queue -= 1
print('[{}] count_in_queue = {}'.format(os.getpid(), count_in_queue))
if not count_in_queue:
break
time.sleep(0.5)
pool.join()
if __name__ == '__main__':
test()