是否可以按顺序启动 Pool 进程?
Is it possible to start Pool processes sequentially?
以下代码启动了三个进程,它们在一个池中以处理 20 个 worker 调用:
import multiprocessing
def worker(nr):
print(nr)
numbers = [i for i in range(20)]
if __name__ == '__main__':
multiprocessing.freeze_support()
pool = multiprocessing.Pool(processes=3)
results = pool.map(worker, numbers)
pool.close()
pool.join()
有没有办法按顺序启动进程(而不是让它们同时启动),并在每个进程启动之间插入延迟?
如果不使用 Pool
,我会在循环中使用 multiprocessing.Process(target=worker, args=(nr,)).start()
,一个接一个地启动它们,并根据需要插入延迟。不过,我发现 Pool
非常有用(连同 map
调用),所以如果可能的话,我很乐意保留它。
你不能做这样简单的事情吗:
from multiprocessing import Process
from time import sleep
def f(n):
print 'started job: '+str(n)
sleep(3)
print 'ended job: '+str(n)
if __name__ == '__main__':
for i in range(0,100):
p = Process(target=f, args=(i,))
p.start()
sleep(1)
结果
started job: 0
started job: 1
started job: 2
ended job: 0
started job: 3
ended job: 1
started job: 4
ended job: 2
started job: 5
根据 documentation,不存在对池化进程的此类控制。但是,您可以用锁模拟它:
import multiprocessing
import time
lock = multiprocessing.Lock()
def worker(nr):
lock.acquire()
time.sleep(0.100)
lock.release()
print(nr)
numbers = [i for i in range(20)]
if __name__ == '__main__':
multiprocessing.freeze_support()
pool = multiprocessing.Pool(processes=3)
results = pool.map(worker, numbers)
pool.close()
pool.join()
您的 3 个进程仍将同时启动。好吧,我的意思是您无法控制哪个进程首先开始执行回调。但至少你得到了你的延迟。这有效地让每个工人 "starting"(但实际上, 继续 )在指定的时间间隔。
根据以下讨论修正:
请注意,在 Windows 上,无法从父进程继承锁。相反,您可以使用 multiprocessing.Manager().Lock()
在进程之间传递一个全局锁对象(当然会有额外的 IPC 开销)。全局锁对象也需要在每个进程中初始化。这看起来像:
from multiprocessing import Process, freeze_support
import multiprocessing
import time
from datetime import datetime as dt
def worker(nr):
glock.acquire()
print('started job: {} at {}'.format(nr, dt.now()))
time.sleep(1)
glock.release()
print('ended job: {} at {}'.format(nr, dt.now()))
numbers = [i for i in range(6)]
def init(lock):
global glock
glock = lock
if __name__ == '__main__':
multiprocessing.freeze_support()
lock = multiprocessing.Manager().Lock()
pool = multiprocessing.Pool(processes=3, initializer=init, initargs=(lock,))
results = pool.map(worker, numbers)
pool.close()
pool.join()
您可以尝试定义一个缓慢生成值的函数吗?
def get_numbers_on_delay(numbers, delay):
for i in numbers:
yield i
time.sleep(delay)
然后:
results = pool.map(worker, get_numbers_on_delay(numbers, 5))
我还没有测试过,所以我不确定,但试一试。
由于某种原因我无法获得锁定答案,所以我以这种方式实现了它。
我意识到这个问题很老了,但也许其他人也有同样的问题。
它生成所有类似于锁定解决方案的进程,但根据进程名称编号在工作前休眠。
from multiprocessing import current_process
from re import search
from time import sleep
def worker():
process_number = search('\d+', current_process().name).group()
time_between_workers = 5
sleep(time_between_workers * int(process_number))
#do your work here
由于赋予进程的名称似乎是唯一且递增的,因此这将获取进程的编号并根据该编号休眠。
SpawnPoolWorker-1 休眠 1 * 5 秒,SpawnPoolWorker-2 休眠 2 * 5 秒等
以下代码启动了三个进程,它们在一个池中以处理 20 个 worker 调用:
import multiprocessing
def worker(nr):
print(nr)
numbers = [i for i in range(20)]
if __name__ == '__main__':
multiprocessing.freeze_support()
pool = multiprocessing.Pool(processes=3)
results = pool.map(worker, numbers)
pool.close()
pool.join()
有没有办法按顺序启动进程(而不是让它们同时启动),并在每个进程启动之间插入延迟?
如果不使用 Pool
,我会在循环中使用 multiprocessing.Process(target=worker, args=(nr,)).start()
,一个接一个地启动它们,并根据需要插入延迟。不过,我发现 Pool
非常有用(连同 map
调用),所以如果可能的话,我很乐意保留它。
你不能做这样简单的事情吗:
from multiprocessing import Process
from time import sleep
def f(n):
print 'started job: '+str(n)
sleep(3)
print 'ended job: '+str(n)
if __name__ == '__main__':
for i in range(0,100):
p = Process(target=f, args=(i,))
p.start()
sleep(1)
结果
started job: 0
started job: 1
started job: 2
ended job: 0
started job: 3
ended job: 1
started job: 4
ended job: 2
started job: 5
根据 documentation,不存在对池化进程的此类控制。但是,您可以用锁模拟它:
import multiprocessing
import time
lock = multiprocessing.Lock()
def worker(nr):
lock.acquire()
time.sleep(0.100)
lock.release()
print(nr)
numbers = [i for i in range(20)]
if __name__ == '__main__':
multiprocessing.freeze_support()
pool = multiprocessing.Pool(processes=3)
results = pool.map(worker, numbers)
pool.close()
pool.join()
您的 3 个进程仍将同时启动。好吧,我的意思是您无法控制哪个进程首先开始执行回调。但至少你得到了你的延迟。这有效地让每个工人 "starting"(但实际上, 继续 )在指定的时间间隔。
根据以下讨论修正:
请注意,在 Windows 上,无法从父进程继承锁。相反,您可以使用 multiprocessing.Manager().Lock()
在进程之间传递一个全局锁对象(当然会有额外的 IPC 开销)。全局锁对象也需要在每个进程中初始化。这看起来像:
from multiprocessing import Process, freeze_support
import multiprocessing
import time
from datetime import datetime as dt
def worker(nr):
glock.acquire()
print('started job: {} at {}'.format(nr, dt.now()))
time.sleep(1)
glock.release()
print('ended job: {} at {}'.format(nr, dt.now()))
numbers = [i for i in range(6)]
def init(lock):
global glock
glock = lock
if __name__ == '__main__':
multiprocessing.freeze_support()
lock = multiprocessing.Manager().Lock()
pool = multiprocessing.Pool(processes=3, initializer=init, initargs=(lock,))
results = pool.map(worker, numbers)
pool.close()
pool.join()
您可以尝试定义一个缓慢生成值的函数吗?
def get_numbers_on_delay(numbers, delay):
for i in numbers:
yield i
time.sleep(delay)
然后:
results = pool.map(worker, get_numbers_on_delay(numbers, 5))
我还没有测试过,所以我不确定,但试一试。
由于某种原因我无法获得锁定答案,所以我以这种方式实现了它。 我意识到这个问题很老了,但也许其他人也有同样的问题。
它生成所有类似于锁定解决方案的进程,但根据进程名称编号在工作前休眠。
from multiprocessing import current_process
from re import search
from time import sleep
def worker():
process_number = search('\d+', current_process().name).group()
time_between_workers = 5
sleep(time_between_workers * int(process_number))
#do your work here
由于赋予进程的名称似乎是唯一且递增的,因此这将获取进程的编号并根据该编号休眠。 SpawnPoolWorker-1 休眠 1 * 5 秒,SpawnPoolWorker-2 休眠 2 * 5 秒等