如何同时加入一个multiprocessing.Process()的列表?
How to join a list of multiprocessing.Process() at the same time?
给定 list()
个 运行 multiprocessing.Process
实例,我如何才能加入所有这些实例,并且 return 一旦没有 [=15] =]-超时和循环?
例子
from multiprocessing import Process
from random import randint
from time import sleep
def run():
sleep(randint(0,5))
running = [ Process(target=run) for i in range(10) ]
for p in running:
p.start()
在 p
中至少有一个 Process
退出之前,我该如何阻止?
我不想做的是:
exit = False
while not exit:
for p in running:
p.join(0)
if p.exitcode is not None:
exit = True
break
您可以使用 multiprocessing.connection.wait()
(Python 3.3+) to wait on several Process.sentinel
s at once. A sentinel 将准备就绪,一旦进程退出并因此解锁 connection.wait()
。
multiprocessing.connection.wait(object_list, timeout=None)
Wait till an object in object_list is ready. Returns the list of those
objects in object_list which are ready. If timeout is a float then the
call blocks for at most that many seconds. If timeout is None then it
will block for an unlimited period. A negative timeout is equivalent
to a zero timeout.
For both Unix and Windows, an object can appear in object_list if it
is
a readable Connection object;
a connected and readable socket.socket object; or
the sentinel attribute of a Process object.
A connection or socket object is ready when there is data available to
be read from it, or the other end has been closed. ...
from multiprocessing import Process, connection, current_process
from random import randint
from time import sleep
from datetime import datetime
def run():
sleep(randint(2,10))
print(f"{datetime.now()} {current_process().name} exiting")
if __name__ == '__main__':
pool = [Process(target=run) for _ in range(4)]
for p in pool:
p.start()
print(f"{datetime.now()} {current_process().name} waiting")
connection.wait(p.sentinel for p in pool)
print(f"{datetime.now()} {current_process().name} unblocked")
示例输出:
2019-07-22 21:54:07.061989 MainProcess waiting
2019-07-22 21:54:09.062498 Process-3 exiting
2019-07-22 21:54:09.063565 MainProcess unblocked
2019-07-22 21:54:09.064391 Process-4 exiting
2019-07-22 21:54:14.068392 Process-2 exiting
2019-07-22 21:54:17.062045 Process-1 exiting
Process finished with exit code 0
这里确实没有办法完全按照指定的方式执行您想要的操作 - 这不是 API 的设置方式。不过,如果您可以将其提升到创建进程列表的级别,则有许多出色的解决方案。
可能最好的方法是使用 multiprocessing.Pool.imap_unordered()。这将采用一个函数和一个可迭代的输入,创建一堆流程,并将输入提供给流程。它 return 是一个迭代器,next
方法将等待一个值准备好,然后 return 每个值都可用。
如果您不能将问题转化为函数 + 输入,下一个解决方案是使用一些同步原语。对于我猜你想要完成的事情,我会使用信号量-
sem = Semaphore(0)
def build_proc(the_sem):
do_some_work
the_sem.release()
myprocs = [buld_proc(sem) for _ in range(10)]
# in your code-
start_procs(myprocs)
done = 0
while done < len(myprocs):
sem.acquire()
do_post_processing()
如果你真的不需要循环,事件也可以,只需等待第一个进程设置它。如果你真的不能以任何方式修改创建进程的函数,我能想到的最终解决方案是(相当糟糕哈哈) - 使用线程池为每个进程设置一个等待者池。
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
def waiter(proc):
proc.join()
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(waiter, p) for p in processes]
# this will return as soon as one completes
results = wait(futures, return_when=FIRST_COMPLETED)
给定 list()
个 运行 multiprocessing.Process
实例,我如何才能加入所有这些实例,并且 return 一旦没有 [=15] =]-超时和循环?
例子
from multiprocessing import Process
from random import randint
from time import sleep
def run():
sleep(randint(0,5))
running = [ Process(target=run) for i in range(10) ]
for p in running:
p.start()
在 p
中至少有一个 Process
退出之前,我该如何阻止?
我不想做的是:
exit = False
while not exit:
for p in running:
p.join(0)
if p.exitcode is not None:
exit = True
break
您可以使用 multiprocessing.connection.wait()
(Python 3.3+) to wait on several Process.sentinel
s at once. A sentinel 将准备就绪,一旦进程退出并因此解锁 connection.wait()
。
multiprocessing.connection.wait(object_list, timeout=None)
Wait till an object in object_list is ready. Returns the list of those objects in object_list which are ready. If timeout is a float then the call blocks for at most that many seconds. If timeout is None then it will block for an unlimited period. A negative timeout is equivalent to a zero timeout.
For both Unix and Windows, an object can appear in object_list if it is
a readable Connection object;
a connected and readable socket.socket object; or
the sentinel attribute of a Process object.
A connection or socket object is ready when there is data available to be read from it, or the other end has been closed. ...
from multiprocessing import Process, connection, current_process
from random import randint
from time import sleep
from datetime import datetime
def run():
sleep(randint(2,10))
print(f"{datetime.now()} {current_process().name} exiting")
if __name__ == '__main__':
pool = [Process(target=run) for _ in range(4)]
for p in pool:
p.start()
print(f"{datetime.now()} {current_process().name} waiting")
connection.wait(p.sentinel for p in pool)
print(f"{datetime.now()} {current_process().name} unblocked")
示例输出:
2019-07-22 21:54:07.061989 MainProcess waiting
2019-07-22 21:54:09.062498 Process-3 exiting
2019-07-22 21:54:09.063565 MainProcess unblocked
2019-07-22 21:54:09.064391 Process-4 exiting
2019-07-22 21:54:14.068392 Process-2 exiting
2019-07-22 21:54:17.062045 Process-1 exiting
Process finished with exit code 0
这里确实没有办法完全按照指定的方式执行您想要的操作 - 这不是 API 的设置方式。不过,如果您可以将其提升到创建进程列表的级别,则有许多出色的解决方案。
可能最好的方法是使用 multiprocessing.Pool.imap_unordered()。这将采用一个函数和一个可迭代的输入,创建一堆流程,并将输入提供给流程。它 return 是一个迭代器,next
方法将等待一个值准备好,然后 return 每个值都可用。
如果您不能将问题转化为函数 + 输入,下一个解决方案是使用一些同步原语。对于我猜你想要完成的事情,我会使用信号量-
sem = Semaphore(0)
def build_proc(the_sem):
do_some_work
the_sem.release()
myprocs = [buld_proc(sem) for _ in range(10)]
# in your code-
start_procs(myprocs)
done = 0
while done < len(myprocs):
sem.acquire()
do_post_processing()
如果你真的不需要循环,事件也可以,只需等待第一个进程设置它。如果你真的不能以任何方式修改创建进程的函数,我能想到的最终解决方案是(相当糟糕哈哈) - 使用线程池为每个进程设置一个等待者池。
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
def waiter(proc):
proc.join()
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(waiter, p) for p in processes]
# this will return as soon as one completes
results = wait(futures, return_when=FIRST_COMPLETED)