强制多处理池迭代参数

Force Multiprocessing Pool to iterate over argument

我正在使用多处理池 运行 一个函数来一遍又一遍地处理多个参数。我使用一个由另一个线程填充的作业列表和一个 job_handler 函数来处理每个作业。我的问题是,当列表变空时,池将结束该功能。我想让池保持活动状态,等到列表填满。其实有两种方案可以解决这个问题。

1.Use 一个池,但会在列表变空后结束:

from multiprocessing import Pool
from threading import Thread
from time import sleep


def job_handler(i):
    print("Doing job:", i)
    sleep(0.5)

def job_adder():
    i = 0
    while True:
        jobs.append(i)
        i += 1
        sleep(0.1)


if __name__ == "__main__":
    pool = Pool(4)
    jobs = []
    thr = Thread(target=job_adder)
    thr.start()
    # wait for job_adder to add to list
    sleep(1)
    pool.map_async(job_handler, jobs)
    while True:
        pass

2.Multiple map_async:

from multiprocessing import Pool
from threading import Thread
from time import sleep


def job_handler(i):
    print("Doing job:", i)
    sleep(0.5)

def job_adder():
    i = 0
    while True:
        jobs.append(i)
        i += 1
        sleep(0.1)


if __name__ == "__main__":
    pool = Pool(4)
    jobs = []
    thr = Thread(target=job_adder)
    thr.start()
    while True:
        for job in jobs:
            pool1 = pool.map_async(job_handler, (job,))
            jobs.remove(job)

两者有什么区别?我认为第一个选项会更好,因为地图本身会处理迭代。我的目标是获得更好的性能来单独处理每项工作。

在很多情况下都需要“放慢”Pool。这种情况比 some:

简单
q=queue.Queue()
m=pool.imap(iter(q.get,None))

您也可以使用imap_unorderedNone是一个哨兵终止PoolPool 必须使用一个线程来收集任务(因为这些函数“比 map() 更懒惰”),并且它会根据需要在 q 上阻塞。