将任务添加到父级的多处理池

Add Task to Multiprocessing Pool of Parent

如何将新任务添加到我在父进程中初始化的 multiprocessing 池中?以下内容不起作用:

from multiprocessing import Pool


def child_task(x):
    # the child task spawns new tasks
    results = p.map(grandchild_task, [x])
    return results[0]


def grandchild_task(x):
    return x


if __name__ == '__main__':
    p = Pool(2)
    print(p.map(child_task, [0]))
    # Result: NameError: name 'p' is not defined

动机:我需要并行化一个由各种子任务组成的程序,这些子任务本身也有子任务(即孙任务)。仅并行化子任务或孙任务不会利用我所有的 CPU 个核心。

在我的用例中,我有各种子任务(可能 1-50 个)和每个子任务的许多孙子任务(可能 100-1000 个)。

备选方案:如果使用 Python 的多处理包无法做到这一点,我很乐意切换到另一个支持它的库。

有一个最小的可重现示例这样的东西,然后除此之外还删除了太多代码,最终得到的东西 (1) 可能过于简单化了,有危险,答案可能会遗漏标记和 (2) 不可能 运行 如图所示(您需要将创建 Pool 和提交任务的代码包含在由 if __name__ == '__main__': 语句控制的块中。

但是根据您所展示的内容,我认为 Pool 不是适合您的解决方案;您应该根据需要创建 Process 实例。从进程中获取结果的一种方法是将它们存储在一个可共享的托管字典中,其键例如是创建结果的进程的进程 ID。

为了扩展您的示例,向子任务传递了两个参数,xy,并且需要 return 作为结果 x**2 + 'y**2。子任务将产生两个孙任务实例,每个实例计算其参数的平方。然后,子任务将使用加法组合来自这些进程的 return 值:

from multiprocessing import Process, Manager
import os


def child_task(results_dict, x, y):
    # the child task spawns new tasks
    p1 = Process(target=grandchild_task, args=(results_dict, x))
    p1.start()
    pid1 = p1.pid
    p2 = Process(target=grandchild_task, args=(results_dict, y))
    p2.start()
    pid2 = p2.pid
    p1.join()
    p2.join()
    pid = os.getpid()
    results_dict[pid] = results_dict[pid1] + results_dict[pid2]



def grandchild_task(results_dict, n):
    pid = os.getpid()
    results_dict[pid] = n * n


def main():
    manager = Manager()
    results_dict = manager.dict()
    p = Process(target=child_task, args=(results_dict, 2, 3))
    p.start()
    pid = p.pid
    p.join()
    # results will be stored with key p.pid:
    print(results_dict[pid])

if __name__ == '__main__':
    main()

打印:

13

更新

如果你真的遇到这样的情况,例如,child_task 需要处理 N 个相同的调用,只是参数不同,但它必须产生一个或两个 sub-process,然后使用池和以前一样,但另外将托管字典传递给 child_task 以用于生成其他进程( 而不是 试图为此使用池)并检索它们的结果。

更新 2

我能想到 sub-processes 自己使用池化的唯一方法是使用 concurrent.futures 模块中的 ProcessPoolExecutor class。当我试图用 multiprocessing.Pool 做同样的事情时,我得到了一个错误,因为我们有守护进程试图创建自己的进程。但即使在这里,唯一的方法是池中的每个进程都有自己的进程池。您的计算机上只有有限数量的 processors/cores,因此除非在处理过程中混合了一些 I/O,否则您可以创建所有这些池,但进程将等待机会 运行。因此,尚不清楚将实现什么样的性能提升。还有关闭为 child_task sub-processes 创建的所有池的问题。通常一个 ProcessPoolExecutor 实例是使用 with 块创建的,当该块终止时,创建的池将被清理。但是 child_task 被重复调用并且显然不能使用 with 块,因为我们不希望不断地创建和销毁池。我来到这里有点麻烦:传递了第三个参数,True 或 False,指示 child_task 是否应该启动其池的关闭。此参数的默认值为 False,我们甚至懒得传递它。在检索到所有实际结果并且 child_task 进程现在空闲之后,我们提交 N 个具有虚拟值但 shutdown 设置为 True 的新任务。请注意,ProcessPoolExecutor 函数 map 的工作方式与 Pool class 中的相同函数有很大不同(阅读文档):

from concurrent.futures import ProcessPoolExecutor
import time


child_executor = None


def child_task(x, y, shutdown=False):
    global child_executor

    if child_executor is None:
        child_executor = ProcessPoolExecutor(max_workers=1)
    if shutdown:
        if child_executor:
            child_executor.shutdown(False)
            child_executor = None
            time.sleep(.2) # make sure another process in the pool gets the next task
        return None
    # the child task spawns new task(s)
    future = child_executor.submit(grandchild_task, y)
    # we can compute one of the results using the current process:
    return grandchild_task(x) + future.result()


def grandchild_task(n):
    return n * n


def main():
    N_WORKERS = 2
    with ProcessPoolExecutor(max_workers=N_WORKERS) as executor:
        # first call is (1, 2), second call is (3, 4):
        results = [result for result in executor.map(child_task, (1, 3), (2, 4))]
        print(results)
        # force a shutdown
        # need N_WORKERS invocations:
        [result for result in executor.map(child_task, (0,) * N_WORKERS, (0,) * N_WORKERS, (True,) * N_WORKERS)]


if __name__ == '__main__':
    main()

打印:

[5, 25]

检查这个解决方案:

#!/usr/bin/python
# requires Python version 3.8 or higher

from multiprocessing import Queue, Process
import time
from random import randrange
import os
import psutil


# function to be run by each child process
def square(number):
    sleep = randrange(5)
    time.sleep(sleep)
    print(f'Result is {number * number}, computed by pid {os.getpid()}...sleeping {sleep} secs')


# create a queue where all tasks will be placed
queue = Queue()

# indicate how many number of children you want the system to create to run the tasks
number_of_child_proceses = 5

# put all tasks in the queue above
for task in range(19):
    queue.put(task)


# this the main entry/start of the program when you run
def main():
    number_of_task = queue.qsize()
    print(f'{"_" * 60}\nBatch: {number_of_task // number_of_child_proceses + 1} \n{"_" * 60}')

    # don't create more number of children than the number of tasks. Also, in the last round, wait for all child process
    # to complete so as to wrap up everything
    if number_of_task <= number_of_child_proceses:
        processes = [Process(target=square, args=(queue.get(),)) for _ in
                     range(number_of_task)]
        for p in processes:
            p.start()
            p.join()

    else:
        processes = [Process(target=square, args=(queue.get(),)) for _ in range(number_of_child_proceses)]
        for p in processes:
            p.start()

    # update count of remaining task
    number_of_task = queue.qsize()

    # run the program in a loop until no more task remains in the queue
    while number_of_task:
        current_process = psutil.Process()
        children = current_process.children()

        # if children process have completed assigned task but there is still more remaining tasks in the queue,
        # assign them more tasks
        if not len(children) and number_of_task:
            print(f'\nAssigned tasks completed... reasigning the remaining {number_of_task} task(s) in the queue\n')
            main()

    # exit the loop if no more task in the queue to work on

    print('\nAll tasks completed!!')
    exit()


if __name__ == "__main__":
    main()

我环顾四周,发现 Ray, which addresses this exact use case using nested remote functions