是否可以按顺序启动 Pool 进程?

Is it possible to start Pool processes sequentially?

以下代码启动了三个进程,它们在一个池中以处理 20 个 worker 调用:

import multiprocessing

def worker(nr):
    print(nr)

numbers = [i for i in range(20)]

if __name__ == '__main__':
    multiprocessing.freeze_support()
    pool = multiprocessing.Pool(processes=3)
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()

有没有办法按顺序启动进程(而不是让它们同时启动),并在每个进程启动之间插入延迟?

如果不使用 Pool,我会在循环中使用 multiprocessing.Process(target=worker, args=(nr,)).start(),一个接一个地启动它们,并根据需要插入延迟。不过,我发现 Pool 非常有用(连同 map 调用),所以如果可能的话,我很乐意保留它。

你不能做这样简单的事情吗:

from multiprocessing import Process
from time import sleep

def f(n):
    print 'started job: '+str(n)
    sleep(3)
    print 'ended job: '+str(n)

if __name__ == '__main__':
    for i in range(0,100):
        p = Process(target=f, args=(i,))
        p.start()
        sleep(1)

结果

started job: 0
started job: 1
started job: 2
ended job: 0
started job: 3
ended job: 1
started job: 4
ended job: 2
started job: 5

根据 documentation,不存在对池化进程的此类控制。但是,您可以用锁模拟它:

import multiprocessing
import time

lock = multiprocessing.Lock()

def worker(nr):
    lock.acquire()
    time.sleep(0.100)
    lock.release()
    print(nr)

numbers = [i for i in range(20)]

if __name__ == '__main__':
    multiprocessing.freeze_support()
    pool = multiprocessing.Pool(processes=3)
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()

您的 3 个进程仍将同时启动。好吧,我的意思是您无法控制哪个进程首先开始执行回调。但至少你得到了你的延迟。这有效地让每个工人 "starting"(但实际上, 继续 )在指定的时间间隔。

根据以下讨论修正:

请注意,在 Windows 上,无法从父进程继承锁。相反,您可以使用 multiprocessing.Manager().Lock() 在进程之间传递一个全局锁对象(当然会有额外的 IPC 开销)。全局锁对象也需要在每个进程中初始化。这看起来像:

from multiprocessing import Process, freeze_support
import multiprocessing
import time
from datetime import datetime as dt

def worker(nr):
    glock.acquire()
    print('started job: {} at {}'.format(nr, dt.now()))
    time.sleep(1)
    glock.release()
    print('ended   job: {} at {}'.format(nr, dt.now()))

numbers = [i for i in range(6)]

def init(lock):
    global glock
    glock = lock

if __name__ == '__main__':
    multiprocessing.freeze_support()
    lock = multiprocessing.Manager().Lock()
    pool = multiprocessing.Pool(processes=3, initializer=init, initargs=(lock,))
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()

您可以尝试定义一个缓慢生成值的函数吗?

def get_numbers_on_delay(numbers, delay):
    for i in numbers:
        yield i
        time.sleep(delay)

然后:

results = pool.map(worker, get_numbers_on_delay(numbers, 5))

我还没有测试过,所以我不确定,但试一试。

由于某种原因我无法获得锁定答案,所以我以这种方式实现了它。 我意识到这个问题很老了,但也许其他人也有同样的问题。

它生成所有类似于锁定解决方案的进程,但根据进程名称编号在工作前休眠。

from multiprocessing import current_process
from re import search
from time import sleep

def worker():
    process_number = search('\d+', current_process().name).group()
    time_between_workers = 5
    sleep(time_between_workers * int(process_number))
    #do your work here

由于赋予进程的名称似乎是唯一且递增的,因此这将获取进程的编号并根据该编号休眠。 SpawnPoolWorker-1 休眠 1 * 5 秒,SpawnPoolWorker-2 休眠 2 * 5 秒等