如何同时加入一个multiprocessing.Process()的列表?

How to join a list of multiprocessing.Process() at the same time?

给定 list() 个 运行 multiprocessing.Process 实例,我如何才能加入所有这些实例,并且 return 一旦没有 [=15] =]-超时和循环?

例子

from multiprocessing import Process
from random import randint
from time import sleep
def run():
    sleep(randint(0,5))
running = [ Process(target=run) for i in range(10) ]

for p in running:
    p.start()

p 中至少有一个 Process 退出之前,我该如何阻止?

我不想做的是:

exit = False
while not exit:
    for p in running:
        p.join(0)
        if p.exitcode is not None:
            exit = True
            break

您可以使用 multiprocessing.connection.wait() (Python 3.3+) to wait on several Process.sentinels at once. A sentinel 将准备就绪,一旦进程退出并因此解锁 connection.wait()

multiprocessing.connection.wait(object_list, timeout=None)

Wait till an object in object_list is ready. Returns the list of those objects in object_list which are ready. If timeout is a float then the call blocks for at most that many seconds. If timeout is None then it will block for an unlimited period. A negative timeout is equivalent to a zero timeout.

For both Unix and Windows, an object can appear in object_list if it is

  • a readable Connection object;

  • a connected and readable socket.socket object; or

  • the sentinel attribute of a Process object.

A connection or socket object is ready when there is data available to be read from it, or the other end has been closed. ...

from multiprocessing import Process, connection, current_process
from random import randint
from time import sleep
from datetime import datetime


def run():
    sleep(randint(2,10))
    print(f"{datetime.now()} {current_process().name} exiting")


if __name__ == '__main__':

    pool = [Process(target=run) for _ in range(4)]

    for p in pool:
        p.start()

    print(f"{datetime.now()} {current_process().name} waiting")
    connection.wait(p.sentinel for p in pool)
    print(f"{datetime.now()} {current_process().name} unblocked")

示例输出:

2019-07-22 21:54:07.061989 MainProcess waiting
2019-07-22 21:54:09.062498 Process-3 exiting
2019-07-22 21:54:09.063565 MainProcess unblocked
2019-07-22 21:54:09.064391 Process-4 exiting
2019-07-22 21:54:14.068392 Process-2 exiting
2019-07-22 21:54:17.062045 Process-1 exiting

Process finished with exit code 0

这里确实没有办法完全按照指定的方式执行您想要的操作 - 这不是 API 的设置方式。不过,如果您可以将其提升到创建进程列表的级别,则有许多出色的解决方案。

可能最好的方法是使用 multiprocessing.Pool.imap_unordered()。这将采用一个函数和一个可迭代的输入,创建一堆流程,并将输入提供给流程。它 return 是一个迭代器,next 方法将等待一个值准备好,然后 return 每个值都可用。

如果您不能将问题转化为函数 + 输入,下一个解决方案是使用一些同步原语。对于我猜你想要完成的事情,我会使用信号量-

sem = Semaphore(0)

def build_proc(the_sem):
    do_some_work
    the_sem.release()

myprocs = [buld_proc(sem) for _ in range(10)]

# in your code-
start_procs(myprocs)
done = 0
while done < len(myprocs):
    sem.acquire()
    do_post_processing()

如果你真的不需要循环,事件也可以,只需等待第一个进程设置它。如果你真的不能以任何方式修改创建进程的函数,我能想到的最终解决方案是(相当糟糕哈哈) - 使用线程池为每个进程设置一个等待者池。

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED

def waiter(proc):
    proc.join()

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(waiter, p) for p in processes]
    # this will return as soon as one completes
    results = wait(futures, return_when=FIRST_COMPLETED)