给定 N 个生成器,是否可以创建一个在并行进程中运行它们并生成这些生成器的 zip 的生成器?

Given N generators, is it possible to create a generator that runs them in parallel processes and yields the zip of those generators?

假设我有 N 个生成器 gen_1, ..., gen_N,其中每个生成器都会产生相同数量的值。我想要一个生成器 gen 以便它在 N 个并行进程中运行 gen_1, ..., gen_N 并产生 (next(gen_1), next(gen_2), ... next(gen_N))

那是我想要的:

def gen():
   yield (next(gen_1), next(gen_2), ... next(gen_N))

每个 gen_i 都在自己的进程中 运行。是否有可能做到这一点?我尝试在以下虚拟示例中执行此操作但没有成功:

A = range(4)

def gen(a):
    B = ['a', 'b', 'c']
    for b in B:
        yield b + str(a)

def target(g):
    return next(g)

processes = [Process(target=target, args=(gen(a),)) for a in A]

for p in processes:
    p.start()

for p in processes:
    p.join()

但是我收到错误 TypeError: cannot pickle 'generator' object

编辑:

我对@darkonaut 的回答做了一些修改以满足我的需要。我张贴它以防你们中的一些人发现它有用。我们首先定义几个效用函数:

from itertools import zip_longest
from typing import List, Generator


def grouper(iterable, n, fillvalue=iter([])):
    "Collect data into fixed-length chunks or blocks"
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)

def split_generators_into_batches(generators: List[Generator], n_splits):
    chunks = grouper(generators, len(generators) // n_splits + 1)

    return [zip_longest(*chunk) for chunk in chunks]

以下 class 负责将任意数量的生成器分成 n(进程数)个批次并处理它们以产生所需的结果:

import multiprocessing as mp

class GeneratorParallelProcessor:
SENTINEL = 'S'

def __init__(self, generators, n_processes = 2 * mp.cpu_count()):
    self.n_processes = n_processes
    self.generators = split_generators_into_batches(list(generators), n_processes)
    self.queue = mp.SimpleQueue()
    self.barrier = mp.Barrier(n_processes + 1)
    self.sentinels = [self.SENTINEL] * n_processes

    self.processes = [
        mp.Process(target=self._worker, args=(self.barrier, self.queue, gen)) for gen in self.generators
    ]

def process(self):
    for p in self.processes:
        p.start()

    while True:
        results = list(itertools.chain(*(self.queue.get() for _ in self.generators)))
        if results != self.sentinels:
            yield results
            self.barrier.wait()
        else:
            break

    for p in self.processes:
        p.join()

def _worker(self, barrier, queue, generator):
    for x in generator:
        queue.put(x)
        barrier.wait()
    queue.put(self.SENTINEL)

要使用它,只需执行以下操作:

parallel_processor = GeneratorParallelProcessor(generators)

    for grouped_generator in parallel_processor.process():
        output_handler(grouped_generator)

我采用了一些不同的方法,您可以相应地修改下面的示例。 所以在主脚本的某个地方根据你的需要初始化池,你只需要这两行

from multiprocessing import Pool

pool = Pool(processes=4)

然后你可以像这样定义一个生成器函数: (请注意,生成器输入假定为包含所有生成器的任何可迭代对象)

def parallel_generators(generators, pool):
results = ['placeholder']
while len(results) != 0:
    batch = pool.map_async(next, generators)  # defines the next round of values
    results = list(batch.get)  # actual calculation done here
    yield results
return 

我们像这样在 while 循环中定义结果条件,因为当生成器停止生成值时,带有 next 和生成器的映射对象 return 是一个空列表。所以那时我们只是终止并行生成器。

编辑

显然多进程池和映射不能很好地与生成器一起使用,导致上述代码无法按预期工作,因此在以后更新之前不要使用

至于 pickle 错误,似乎某些绑定函数不支持 multiprocessing 库中传输对象和函数所需的 pickle,作为解决方法,pathos mutliprocessing 库使用 dill 解决了 pickle 和是您可能想尝试的一个选项,在 Stack Overflow 中搜索您的错误,您还可以找到一些更复杂的解决方案,这些解决方案使用自定义代码来酸洗所需的功能。

有可能通过一些努力获得这样的“统一并行生成器 (UPG)”(尝试创造一个名称),但正如@jasonharper 已经提到的,您肯定需要到 assemble child-processes 中的 sub-generators,因为 运行 生成器不能被 pickle。

下面的模式是 re-usable,只有生成器函数 gen() 是此示例的自定义模式。该设计使用 multiprocessing.SimpleQueue for returning generator results to the parent and multiprocessing.Barrier 进行同步。

调用 Barrier.wait() 将阻塞调用者(任何进程中的线程),直到指定数量的 parties 调用了 .wait(),此时所有当前等待 [=18= 的线程] 得到同时释放。 Barrier 的使用确保进一步的 generator-results 仅在 之后 父级已收到 all 结果才开始计算迭代,这可能是控制整体内存消耗的理想选择。

使用的并行工作人员数量等于您在 gen_args_tuples-iterable 中提供的 argument-tuples 数量,因此 gen_args_tuples=zip(range(4)) 将使用四个工作人员。有关详细信息,请参阅代码中的注释。

import multiprocessing as mp

SENTINEL = 'SENTINEL'


def gen(a):
    """Your individual generator function."""
    lst = ['a', 'b', 'c']
    for ch in lst:
        for _ in range(int(10e6)):  # some dummy computation
            pass
        yield ch + str(a)


def _worker(i, barrier, queue, gen_func, gen_args):
    for x in gen_func(*gen_args):
        print(f"WORKER-{i} sending item.")
        queue.put((i, x))
        barrier.wait()
    queue.put(SENTINEL)


def parallel_gen(gen_func, gen_args_tuples):
    """Construct and yield from parallel generators
     build from `gen_func(gen_args)`.
     """
    gen_args_tuples = list(gen_args_tuples)  # ensure list
    n_gens = len(gen_args_tuples)
    sentinels = [SENTINEL] * n_gens
    queue = mp.SimpleQueue()
    barrier = mp.Barrier(n_gens + 1)  # `parties`: + 1 for parent

    processes = [
        mp.Process(target=_worker, args=(i, barrier, queue, gen_func, args))
        for i, args in enumerate(gen_args_tuples)
    ]

    for p in processes:
        p.start()

    while True:
        results = [queue.get() for _ in range(n_gens)]
        if results != sentinels:
            results.sort()
            yield tuple(r[1] for r in results)  # sort and drop ids
            barrier.wait()  # all workers are waiting
            # already, so this will unblock immediately
        else:
            break

    for p in processes:
        p.join()


if __name__ == '__main__':

    for res in parallel_gen(gen_func=gen, gen_args_tuples=zip(range(4))):
        print(res)

输出:

WORKER-1 sending item.
WORKER-0 sending item.
WORKER-3 sending item.
WORKER-2 sending item.
('a0', 'a1', 'a2', 'a3')
WORKER-1 sending item.
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-0 sending item.
('b0', 'b1', 'b2', 'b3')
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-1 sending item.
WORKER-0 sending item.
('c0', 'c1', 'c2', 'c3')

Process finished with exit code 0