itertools.groupby 的反转?
reverse of itertools.groupby?
我正在组合生成器以进行一些数据处理。我首先批处理数据生成器以在 API 调用中进行线程处理,例如:
from itertools import groupby, count
def batch(data: List[Any], size=4):
c = count()
for _, g in groupby(data, lambda _: next(c)//size):
yield g
然后我将其提供给穿线器进行 API 调用
from concurrent.futures import ThreadPoolExecutor
def thread(data: Iterable, func: Callable, n=4):
with ThreadPoolExecutor(max_workers=n) as executor:
for batch in data:
yield executor.map(func, batch)
现在我正在尝试将批次合并回 list/generator 以便在生成器管道的下游使用。我试过这个
from itertools import chain
def flat_map(batches: Iterable):
for i in list(chain(batches)):
yield i
但是 i
似乎仍然是生成器而不是列表中的项目?
您想要 chain(*batches)
或 chain.from_iterable(batches)
。 chain(batches)
基本上只是产生与直接使用 batches
相同的值,它只是添加了一层包装。所以正确的代码(没有 list
ifying,这里几乎肯定是错误的)只是:
from itertools import chain
def flat_map(batches: Iterable):
return chain.from_iterable(batches) # chain(*batches) would also work, but if batches is an iterator itself, it would be forced to eagerly run to completion first; chain.from_iterable can begin work when the first batch is ready
您甚至不需要 yield
,因为迭代器已经生成了您想要的结果。如果您需要它成为真正的生成器,只需将 return
替换为 yield from
即可获得类似的结果。
另请注意:您只需更改即可完全避免使用该功能:
yield executor.map(func, batch)
至:
yield from executor.map(func, batch)
所以 thread
一开始就变平了。
所以我最终将三个函数浓缩为一个:
from itertools import chain, groupby
from concurrent.futures import ThreadPoolExecutor
def spread(data: Iterable, func: Callable, n=4):
""" Combines `batch`, `thread` and `flat_map`"""
c = count()
with ThreadPoolExecutor(max_workers=n) as executor:
for _, batch in groupby(data, lambda _: next(c)//n):
yield from executor.map(func, batch)
所以我只需要 yield from
就可以让它工作。
感谢@ShadowRanger!
我正在组合生成器以进行一些数据处理。我首先批处理数据生成器以在 API 调用中进行线程处理,例如:
from itertools import groupby, count
def batch(data: List[Any], size=4):
c = count()
for _, g in groupby(data, lambda _: next(c)//size):
yield g
然后我将其提供给穿线器进行 API 调用
from concurrent.futures import ThreadPoolExecutor
def thread(data: Iterable, func: Callable, n=4):
with ThreadPoolExecutor(max_workers=n) as executor:
for batch in data:
yield executor.map(func, batch)
现在我正在尝试将批次合并回 list/generator 以便在生成器管道的下游使用。我试过这个
from itertools import chain
def flat_map(batches: Iterable):
for i in list(chain(batches)):
yield i
但是 i
似乎仍然是生成器而不是列表中的项目?
您想要 chain(*batches)
或 chain.from_iterable(batches)
。 chain(batches)
基本上只是产生与直接使用 batches
相同的值,它只是添加了一层包装。所以正确的代码(没有 list
ifying,这里几乎肯定是错误的)只是:
from itertools import chain
def flat_map(batches: Iterable):
return chain.from_iterable(batches) # chain(*batches) would also work, but if batches is an iterator itself, it would be forced to eagerly run to completion first; chain.from_iterable can begin work when the first batch is ready
您甚至不需要 yield
,因为迭代器已经生成了您想要的结果。如果您需要它成为真正的生成器,只需将 return
替换为 yield from
即可获得类似的结果。
另请注意:您只需更改即可完全避免使用该功能:
yield executor.map(func, batch)
至:
yield from executor.map(func, batch)
所以 thread
一开始就变平了。
所以我最终将三个函数浓缩为一个:
from itertools import chain, groupby
from concurrent.futures import ThreadPoolExecutor
def spread(data: Iterable, func: Callable, n=4):
""" Combines `batch`, `thread` and `flat_map`"""
c = count()
with ThreadPoolExecutor(max_workers=n) as executor:
for _, batch in groupby(data, lambda _: next(c)//n):
yield from executor.map(func, batch)
所以我只需要 yield from
就可以让它工作。
感谢@ShadowRanger!