强制多处理池迭代参数
Force Multiprocessing Pool to iterate over argument
我正在使用多处理池 运行 一个函数来一遍又一遍地处理多个参数。我使用一个由另一个线程填充的作业列表和一个 job_handler
函数来处理每个作业。我的问题是,当列表变空时,池将结束该功能。我想让池保持活动状态,等到列表填满。其实有两种方案可以解决这个问题。
1.Use 一个池,但会在列表变空后结束:
from multiprocessing import Pool
from threading import Thread
from time import sleep
def job_handler(i):
print("Doing job:", i)
sleep(0.5)
def job_adder():
i = 0
while True:
jobs.append(i)
i += 1
sleep(0.1)
if __name__ == "__main__":
pool = Pool(4)
jobs = []
thr = Thread(target=job_adder)
thr.start()
# wait for job_adder to add to list
sleep(1)
pool.map_async(job_handler, jobs)
while True:
pass
2.Multiple map_async
:
from multiprocessing import Pool
from threading import Thread
from time import sleep
def job_handler(i):
print("Doing job:", i)
sleep(0.5)
def job_adder():
i = 0
while True:
jobs.append(i)
i += 1
sleep(0.1)
if __name__ == "__main__":
pool = Pool(4)
jobs = []
thr = Thread(target=job_adder)
thr.start()
while True:
for job in jobs:
pool1 = pool.map_async(job_handler, (job,))
jobs.remove(job)
两者有什么区别?我认为第一个选项会更好,因为地图本身会处理迭代。我的目标是获得更好的性能来单独处理每项工作。
在很多情况下都需要“放慢”Pool
。这种情况比 some:
简单
q=queue.Queue()
m=pool.imap(iter(q.get,None))
您也可以使用imap_unordered
; None
是一个哨兵终止Pool
。 Pool
必须使用一个线程来收集任务(因为这些函数“比 map()
更懒惰”),并且它会根据需要在 q
上阻塞。
我正在使用多处理池 运行 一个函数来一遍又一遍地处理多个参数。我使用一个由另一个线程填充的作业列表和一个 job_handler
函数来处理每个作业。我的问题是,当列表变空时,池将结束该功能。我想让池保持活动状态,等到列表填满。其实有两种方案可以解决这个问题。
1.Use 一个池,但会在列表变空后结束:
from multiprocessing import Pool
from threading import Thread
from time import sleep
def job_handler(i):
print("Doing job:", i)
sleep(0.5)
def job_adder():
i = 0
while True:
jobs.append(i)
i += 1
sleep(0.1)
if __name__ == "__main__":
pool = Pool(4)
jobs = []
thr = Thread(target=job_adder)
thr.start()
# wait for job_adder to add to list
sleep(1)
pool.map_async(job_handler, jobs)
while True:
pass
2.Multiple map_async
:
from multiprocessing import Pool
from threading import Thread
from time import sleep
def job_handler(i):
print("Doing job:", i)
sleep(0.5)
def job_adder():
i = 0
while True:
jobs.append(i)
i += 1
sleep(0.1)
if __name__ == "__main__":
pool = Pool(4)
jobs = []
thr = Thread(target=job_adder)
thr.start()
while True:
for job in jobs:
pool1 = pool.map_async(job_handler, (job,))
jobs.remove(job)
两者有什么区别?我认为第一个选项会更好,因为地图本身会处理迭代。我的目标是获得更好的性能来单独处理每项工作。
在很多情况下都需要“放慢”Pool
。这种情况比 some:
q=queue.Queue()
m=pool.imap(iter(q.get,None))
您也可以使用imap_unordered
; None
是一个哨兵终止Pool
。 Pool
必须使用一个线程来收集任务(因为这些函数“比 map()
更懒惰”),并且它会根据需要在 q
上阻塞。