给定 N 个生成器,是否可以创建一个在并行进程中运行它们并生成这些生成器的 zip 的生成器?
Given N generators, is it possible to create a generator that runs them in parallel processes and yields the zip of those generators?
假设我有 N 个生成器 gen_1, ..., gen_N
,其中每个生成器都会产生相同数量的值。我想要一个生成器 gen
以便它在 N 个并行进程中运行 gen_1, ..., gen_N 并产生 (next(gen_1), next(gen_2), ... next(gen_N))
那是我想要的:
def gen():
yield (next(gen_1), next(gen_2), ... next(gen_N))
每个 gen_i 都在自己的进程中 运行。是否有可能做到这一点?我尝试在以下虚拟示例中执行此操作但没有成功:
A = range(4)
def gen(a):
B = ['a', 'b', 'c']
for b in B:
yield b + str(a)
def target(g):
return next(g)
processes = [Process(target=target, args=(gen(a),)) for a in A]
for p in processes:
p.start()
for p in processes:
p.join()
但是我收到错误 TypeError: cannot pickle 'generator' object
。
编辑:
我对@darkonaut 的回答做了一些修改以满足我的需要。我张贴它以防你们中的一些人发现它有用。我们首先定义几个效用函数:
from itertools import zip_longest
from typing import List, Generator
def grouper(iterable, n, fillvalue=iter([])):
"Collect data into fixed-length chunks or blocks"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def split_generators_into_batches(generators: List[Generator], n_splits):
chunks = grouper(generators, len(generators) // n_splits + 1)
return [zip_longest(*chunk) for chunk in chunks]
以下 class 负责将任意数量的生成器分成 n(进程数)个批次并处理它们以产生所需的结果:
import multiprocessing as mp
class GeneratorParallelProcessor:
SENTINEL = 'S'
def __init__(self, generators, n_processes = 2 * mp.cpu_count()):
self.n_processes = n_processes
self.generators = split_generators_into_batches(list(generators), n_processes)
self.queue = mp.SimpleQueue()
self.barrier = mp.Barrier(n_processes + 1)
self.sentinels = [self.SENTINEL] * n_processes
self.processes = [
mp.Process(target=self._worker, args=(self.barrier, self.queue, gen)) for gen in self.generators
]
def process(self):
for p in self.processes:
p.start()
while True:
results = list(itertools.chain(*(self.queue.get() for _ in self.generators)))
if results != self.sentinels:
yield results
self.barrier.wait()
else:
break
for p in self.processes:
p.join()
def _worker(self, barrier, queue, generator):
for x in generator:
queue.put(x)
barrier.wait()
queue.put(self.SENTINEL)
要使用它,只需执行以下操作:
parallel_processor = GeneratorParallelProcessor(generators)
for grouped_generator in parallel_processor.process():
output_handler(grouped_generator)
我采用了一些不同的方法,您可以相应地修改下面的示例。
所以在主脚本的某个地方根据你的需要初始化池,你只需要这两行
from multiprocessing import Pool
pool = Pool(processes=4)
然后你可以像这样定义一个生成器函数:
(请注意,生成器输入假定为包含所有生成器的任何可迭代对象)
def parallel_generators(generators, pool):
results = ['placeholder']
while len(results) != 0:
batch = pool.map_async(next, generators) # defines the next round of values
results = list(batch.get) # actual calculation done here
yield results
return
我们像这样在 while 循环中定义结果条件,因为当生成器停止生成值时,带有 next 和生成器的映射对象 return 是一个空列表。所以那时我们只是终止并行生成器。
编辑
显然多进程池和映射不能很好地与生成器一起使用,导致上述代码无法按预期工作,因此在以后更新之前不要使用。
至于 pickle 错误,似乎某些绑定函数不支持 multiprocessing 库中传输对象和函数所需的 pickle,作为解决方法,pathos mutliprocessing 库使用 dill 解决了 pickle 和是您可能想尝试的一个选项,在 Stack Overflow 中搜索您的错误,您还可以找到一些更复杂的解决方案,这些解决方案使用自定义代码来酸洗所需的功能。
有可能通过一些努力获得这样的“统一并行生成器 (UPG)”(尝试创造一个名称),但正如@jasonharper 已经提到的,您肯定需要到 assemble child-processes 中的 sub-generators,因为 运行 生成器不能被 pickle。
下面的模式是 re-usable,只有生成器函数 gen()
是此示例的自定义模式。该设计使用 multiprocessing.SimpleQueue
for returning generator results to the parent and multiprocessing.Barrier
进行同步。
调用 Barrier.wait()
将阻塞调用者(任何进程中的线程),直到指定数量的 parties
调用了 .wait()
,此时所有当前等待 [=18= 的线程] 得到同时释放。 Barrier
的使用确保进一步的 generator-results 仅在 之后 父级已收到 all 结果才开始计算迭代,这可能是控制整体内存消耗的理想选择。
使用的并行工作人员数量等于您在 gen_args_tuples
-iterable 中提供的 argument-tuples 数量,因此 gen_args_tuples=zip(range(4))
将使用四个工作人员。有关详细信息,请参阅代码中的注释。
import multiprocessing as mp
SENTINEL = 'SENTINEL'
def gen(a):
"""Your individual generator function."""
lst = ['a', 'b', 'c']
for ch in lst:
for _ in range(int(10e6)): # some dummy computation
pass
yield ch + str(a)
def _worker(i, barrier, queue, gen_func, gen_args):
for x in gen_func(*gen_args):
print(f"WORKER-{i} sending item.")
queue.put((i, x))
barrier.wait()
queue.put(SENTINEL)
def parallel_gen(gen_func, gen_args_tuples):
"""Construct and yield from parallel generators
build from `gen_func(gen_args)`.
"""
gen_args_tuples = list(gen_args_tuples) # ensure list
n_gens = len(gen_args_tuples)
sentinels = [SENTINEL] * n_gens
queue = mp.SimpleQueue()
barrier = mp.Barrier(n_gens + 1) # `parties`: + 1 for parent
processes = [
mp.Process(target=_worker, args=(i, barrier, queue, gen_func, args))
for i, args in enumerate(gen_args_tuples)
]
for p in processes:
p.start()
while True:
results = [queue.get() for _ in range(n_gens)]
if results != sentinels:
results.sort()
yield tuple(r[1] for r in results) # sort and drop ids
barrier.wait() # all workers are waiting
# already, so this will unblock immediately
else:
break
for p in processes:
p.join()
if __name__ == '__main__':
for res in parallel_gen(gen_func=gen, gen_args_tuples=zip(range(4))):
print(res)
输出:
WORKER-1 sending item.
WORKER-0 sending item.
WORKER-3 sending item.
WORKER-2 sending item.
('a0', 'a1', 'a2', 'a3')
WORKER-1 sending item.
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-0 sending item.
('b0', 'b1', 'b2', 'b3')
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-1 sending item.
WORKER-0 sending item.
('c0', 'c1', 'c2', 'c3')
Process finished with exit code 0
假设我有 N 个生成器 gen_1, ..., gen_N
,其中每个生成器都会产生相同数量的值。我想要一个生成器 gen
以便它在 N 个并行进程中运行 gen_1, ..., gen_N 并产生 (next(gen_1), next(gen_2), ... next(gen_N))
那是我想要的:
def gen():
yield (next(gen_1), next(gen_2), ... next(gen_N))
每个 gen_i 都在自己的进程中 运行。是否有可能做到这一点?我尝试在以下虚拟示例中执行此操作但没有成功:
A = range(4)
def gen(a):
B = ['a', 'b', 'c']
for b in B:
yield b + str(a)
def target(g):
return next(g)
processes = [Process(target=target, args=(gen(a),)) for a in A]
for p in processes:
p.start()
for p in processes:
p.join()
但是我收到错误 TypeError: cannot pickle 'generator' object
。
编辑:
我对@darkonaut 的回答做了一些修改以满足我的需要。我张贴它以防你们中的一些人发现它有用。我们首先定义几个效用函数:
from itertools import zip_longest
from typing import List, Generator
def grouper(iterable, n, fillvalue=iter([])):
"Collect data into fixed-length chunks or blocks"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def split_generators_into_batches(generators: List[Generator], n_splits):
chunks = grouper(generators, len(generators) // n_splits + 1)
return [zip_longest(*chunk) for chunk in chunks]
以下 class 负责将任意数量的生成器分成 n(进程数)个批次并处理它们以产生所需的结果:
import multiprocessing as mp
class GeneratorParallelProcessor:
SENTINEL = 'S'
def __init__(self, generators, n_processes = 2 * mp.cpu_count()):
self.n_processes = n_processes
self.generators = split_generators_into_batches(list(generators), n_processes)
self.queue = mp.SimpleQueue()
self.barrier = mp.Barrier(n_processes + 1)
self.sentinels = [self.SENTINEL] * n_processes
self.processes = [
mp.Process(target=self._worker, args=(self.barrier, self.queue, gen)) for gen in self.generators
]
def process(self):
for p in self.processes:
p.start()
while True:
results = list(itertools.chain(*(self.queue.get() for _ in self.generators)))
if results != self.sentinels:
yield results
self.barrier.wait()
else:
break
for p in self.processes:
p.join()
def _worker(self, barrier, queue, generator):
for x in generator:
queue.put(x)
barrier.wait()
queue.put(self.SENTINEL)
要使用它,只需执行以下操作:
parallel_processor = GeneratorParallelProcessor(generators)
for grouped_generator in parallel_processor.process():
output_handler(grouped_generator)
我采用了一些不同的方法,您可以相应地修改下面的示例。 所以在主脚本的某个地方根据你的需要初始化池,你只需要这两行
from multiprocessing import Pool
pool = Pool(processes=4)
然后你可以像这样定义一个生成器函数: (请注意,生成器输入假定为包含所有生成器的任何可迭代对象)
def parallel_generators(generators, pool):
results = ['placeholder']
while len(results) != 0:
batch = pool.map_async(next, generators) # defines the next round of values
results = list(batch.get) # actual calculation done here
yield results
return
我们像这样在 while 循环中定义结果条件,因为当生成器停止生成值时,带有 next 和生成器的映射对象 return 是一个空列表。所以那时我们只是终止并行生成器。
编辑
显然多进程池和映射不能很好地与生成器一起使用,导致上述代码无法按预期工作,因此在以后更新之前不要使用。
至于 pickle 错误,似乎某些绑定函数不支持 multiprocessing 库中传输对象和函数所需的 pickle,作为解决方法,pathos mutliprocessing 库使用 dill 解决了 pickle 和是您可能想尝试的一个选项,在 Stack Overflow 中搜索您的错误,您还可以找到一些更复杂的解决方案,这些解决方案使用自定义代码来酸洗所需的功能。
有可能通过一些努力获得这样的“统一并行生成器 (UPG)”(尝试创造一个名称),但正如@jasonharper 已经提到的,您肯定需要到 assemble child-processes 中的 sub-generators,因为 运行 生成器不能被 pickle。
下面的模式是 re-usable,只有生成器函数 gen()
是此示例的自定义模式。该设计使用 multiprocessing.SimpleQueue
for returning generator results to the parent and multiprocessing.Barrier
进行同步。
调用 Barrier.wait()
将阻塞调用者(任何进程中的线程),直到指定数量的 parties
调用了 .wait()
,此时所有当前等待 [=18= 的线程] 得到同时释放。 Barrier
的使用确保进一步的 generator-results 仅在 之后 父级已收到 all 结果才开始计算迭代,这可能是控制整体内存消耗的理想选择。
使用的并行工作人员数量等于您在 gen_args_tuples
-iterable 中提供的 argument-tuples 数量,因此 gen_args_tuples=zip(range(4))
将使用四个工作人员。有关详细信息,请参阅代码中的注释。
import multiprocessing as mp
SENTINEL = 'SENTINEL'
def gen(a):
"""Your individual generator function."""
lst = ['a', 'b', 'c']
for ch in lst:
for _ in range(int(10e6)): # some dummy computation
pass
yield ch + str(a)
def _worker(i, barrier, queue, gen_func, gen_args):
for x in gen_func(*gen_args):
print(f"WORKER-{i} sending item.")
queue.put((i, x))
barrier.wait()
queue.put(SENTINEL)
def parallel_gen(gen_func, gen_args_tuples):
"""Construct and yield from parallel generators
build from `gen_func(gen_args)`.
"""
gen_args_tuples = list(gen_args_tuples) # ensure list
n_gens = len(gen_args_tuples)
sentinels = [SENTINEL] * n_gens
queue = mp.SimpleQueue()
barrier = mp.Barrier(n_gens + 1) # `parties`: + 1 for parent
processes = [
mp.Process(target=_worker, args=(i, barrier, queue, gen_func, args))
for i, args in enumerate(gen_args_tuples)
]
for p in processes:
p.start()
while True:
results = [queue.get() for _ in range(n_gens)]
if results != sentinels:
results.sort()
yield tuple(r[1] for r in results) # sort and drop ids
barrier.wait() # all workers are waiting
# already, so this will unblock immediately
else:
break
for p in processes:
p.join()
if __name__ == '__main__':
for res in parallel_gen(gen_func=gen, gen_args_tuples=zip(range(4))):
print(res)
输出:
WORKER-1 sending item.
WORKER-0 sending item.
WORKER-3 sending item.
WORKER-2 sending item.
('a0', 'a1', 'a2', 'a3')
WORKER-1 sending item.
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-0 sending item.
('b0', 'b1', 'b2', 'b3')
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-1 sending item.
WORKER-0 sending item.
('c0', 'c1', 'c2', 'c3')
Process finished with exit code 0