Python 多处理中的嵌套并行性

Nested parallelism in Python multiprocessing

我知道这听起来像是以前有人问过的问题,但是等等,我会解释为什么其他选项不起作用。

我目前正在使用 multiprocessing.Pool 在应用程序中实现并行性,并希望对其进行扩展以便能够利用嵌套并行性。仅将 Pool 对象作为参数传递给 apply_async 的天真方法不起作用 as noted in other answers,因为 Pool 无法被腌制。

这是我的要求:

  1. 我需要某种池来限制并发执行任务的数量。例如。 multiprocess.Pool 用于此目的,但不能传递给其他进程。

  2. 我需要嵌套并行。在我的应用程序中,我需要执行 I/O 以确定嵌套工作是什么,所以我绝对不想从单个线程执行此操作。我认为这排除了 this question.

  3. 的所有答案
  4. 需要在标准库中;我无法添加依赖项。这排除了 this answer.

  5. 我真的很希望它能与 Python 2 和 3 一起使用。但是,如果可以证明移动到 Python 3 可以解决我的问题,我会考虑的

我不需要这个来专门使用多个进程,使用线程就可以了,因为大部分工作是 I/O 或等待子进程完成。

我试过使用 multiprocessing.dummy,它是相同的界面,但在 threading 之上实现。但是,当我尝试调用 get() 来检索我的测试结果时,我收到以下错误,所以我认为这是错误的。

  File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
ValueError: signal only works in main thread

我知道 Python 3 中的 concurrent.futures 库,但这似乎有一些严重的限制。例如,本节中的第二个示例在我的案例中似乎是一个表演障碍:

https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor

我看不出您如何能够避免使用任何直接编写的嵌套并行算法来解决这个问题。所以就算我愿意用Python 3,我觉得这也是行不通的。

如果没有编写我自己的实现,我不知道标准库中有任何其他选项可用。

你似乎已经排除了,但我怀疑 https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor, or https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor 会工作,如果你能够移动到 Python 3,或者为 Python 2 添加依赖项。

如果每个文件的额外工作在该文件被处理之前不必被触发,您可以有一个触发所有其他线程的协调线程,这样就可以防止死锁,如下例所示。

from concurrent.futures import ThreadPoolExecutor
import time

pool = ThreadPoolExecutor(max_workers=3)

def find_work_inputs(dummy_file):
    print("{}: Finding work...".format(dummy_file))
    time.sleep(1)
    work = range(0, dummy_file)
    print("{}: Work is {}".format(dummy_file, work))
    return work

def do_work(dummy_file, work_input):
    print("{}: {}".format(dummy_file, work_input))
    print("{}: Doing work {}...".format(dummy_file, work_input))
    time.sleep(1)
    return work_input * work_input

dummy_files = [1,2,3,4,5]

futures = []
for dummy_file in dummy_files:
    work_inputs = pool.submit(find_work_inputs, dummy_file)
    for work_input in work_inputs.result():
        result = work_input
        futures.append((dummy_file, result, pool.submit(do_work, dummy_file, result)))

for dummy_file, work_input, future in futures:
    print("Result from file:{} input:{} is {}".format(dummy_file, work_input, future.result()))

或者,如果第一层的每个线程都需要自己触发工作,额外的工作可能需要在另一个池中以防止死锁(取决于每个未来何时调用 result())作为下面。

from concurrent.futures import ThreadPoolExecutor
import time

find_work_pool = ThreadPoolExecutor(max_workers=3)
do_work_pool = ThreadPoolExecutor(max_workers=3)

def find_work_inputs(dummy_file):
    print("{}: Finding work...".format(dummy_file))
    time.sleep(1)
    work = range(0, dummy_file)
    print("{}: Work is {}".format(dummy_file, work))

    futures = []
    for work_input in work:
        futures.append((dummy_file, work_input, do_work_pool.submit(do_work, dummy_file, work_input)))
    return futures

def do_work(dummy_file, work_input):
    print("{}: {}".format(dummy_file, work_input))
    print("{}: Doing work {}...".format(dummy_file, work_input))
    time.sleep(1)
    return work_input * work_input

dummy_files = [1,2,3,4,5]

futures = []
for dummy_file in dummy_files:
    futures.extend(find_work_pool.submit(find_work_inputs, dummy_file).result())

for dummy_file, work_input, future in futures:
    print("Result from file:{} input:{} is {}".format(dummy_file, work_input, future.result()))