有没有办法限制提交给工人池的数量?
is there a way to limit how much gets submitted to a Pool of workers?
我有一个工人池并使用 apply_async
向他们提交工作。
我不关心应用于每个项目的函数的结果。
该池似乎接受任意数量的 apply_async
调用,无论数据有多大或工作人员跟上工作的速度有多快。
有没有办法让 apply_async
在等待处理的项目达到一定数量时立即阻塞?我确信在内部,池正在使用队列,所以只为队列使用最大大小是微不足道的吗?
如果不支持,提交一份大报告是否有意义,因为这看起来像是非常基本的功能,添加起来相当微不足道?
如果为了完成这项工作而必须从本质上重新实现 Pool 的整个逻辑,那将是一种耻辱。
这是一些非常基本的代码:
from multiprocessing import Pool
dowork(item):
# process the item (for side effects, no return value needed)
pass
pool = Pool(nprocesses)
for work in getmorework():
# this should block if we already have too many work waiting!
pool.apply_async(dowork, (work,))
pool.close()
pool.join()
是这样的吗?
import multiprocessing
import time
worker_count = 4
mp = multiprocessing.Pool(processes=worker_count)
workers = [None] * worker_count
while True:
try:
for i in range(worker_count):
if workers[i] is None or workers[i].ready():
workers[i] = mp.apply_async(dowork, args=next(getmorework()))
except StopIteration:
break
time.sleep(1)
我不知道您希望每个工人完成的速度有多快,time.sleep
可能是必要的,也可能不是必要的,或者可能需要不同的时间或其他什么。
另一种方法可能是直接使用 Queue
:
from multiprocessing import Process, JoinableQueue
from time import sleep
from random import random
def do_work(i):
print(f"worker {i}")
sleep(random())
print(f"done {i}")
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
def generator(n):
for i in range(n):
print(f"gen {i}")
yield i
# 1 = allow generator to get this far ahead
q = JoinableQueue(1)
# 2 = maximum amount of parallelism
procs = [Process(target=worker) for _ in range(2)]
# and get them running
for p in procs:
p.daemon = True
p.start()
# schedule 10 items for processing
for item in generator(10):
q.put(item)
# wait for jobs to finish executing
q.join()
# signal workers to finish up
for p in procs:
q.put(None)
# wait for workers to actually finish
for p in procs:
p.join()
主要是从示例 Python 的 queue
模块中偷来的:
https://docs.python.org/3/library/queue.html#queue.Queue.join
我有一个工人池并使用 apply_async
向他们提交工作。
我不关心应用于每个项目的函数的结果。
该池似乎接受任意数量的 apply_async
调用,无论数据有多大或工作人员跟上工作的速度有多快。
有没有办法让 apply_async
在等待处理的项目达到一定数量时立即阻塞?我确信在内部,池正在使用队列,所以只为队列使用最大大小是微不足道的吗?
如果不支持,提交一份大报告是否有意义,因为这看起来像是非常基本的功能,添加起来相当微不足道?
如果为了完成这项工作而必须从本质上重新实现 Pool 的整个逻辑,那将是一种耻辱。
这是一些非常基本的代码:
from multiprocessing import Pool
dowork(item):
# process the item (for side effects, no return value needed)
pass
pool = Pool(nprocesses)
for work in getmorework():
# this should block if we already have too many work waiting!
pool.apply_async(dowork, (work,))
pool.close()
pool.join()
是这样的吗?
import multiprocessing
import time
worker_count = 4
mp = multiprocessing.Pool(processes=worker_count)
workers = [None] * worker_count
while True:
try:
for i in range(worker_count):
if workers[i] is None or workers[i].ready():
workers[i] = mp.apply_async(dowork, args=next(getmorework()))
except StopIteration:
break
time.sleep(1)
我不知道您希望每个工人完成的速度有多快,time.sleep
可能是必要的,也可能不是必要的,或者可能需要不同的时间或其他什么。
另一种方法可能是直接使用 Queue
:
from multiprocessing import Process, JoinableQueue
from time import sleep
from random import random
def do_work(i):
print(f"worker {i}")
sleep(random())
print(f"done {i}")
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
def generator(n):
for i in range(n):
print(f"gen {i}")
yield i
# 1 = allow generator to get this far ahead
q = JoinableQueue(1)
# 2 = maximum amount of parallelism
procs = [Process(target=worker) for _ in range(2)]
# and get them running
for p in procs:
p.daemon = True
p.start()
# schedule 10 items for processing
for item in generator(10):
q.put(item)
# wait for jobs to finish executing
q.join()
# signal workers to finish up
for p in procs:
q.put(None)
# wait for workers to actually finish
for p in procs:
p.join()
主要是从示例 Python 的 queue
模块中偷来的:
https://docs.python.org/3/library/queue.html#queue.Queue.join