async_generator 块
Chunks of async_generator
我可以获得迭代器的块,如下所示:
def get_chunks_it(l, n):
""" Chunks an iterator `l` in size `n`
Args:
l (Iterator[Any]): an iterator
n (int): size of
Returns:
Generator[Any]
"""
iterator = iter(l)
for first in iterator:
yield itertools.chain([first], itertools.islice(iterator, n - 1))
现在假设我有一个异步发电机 (python 3.6):
async def generator():
for i in range(0, 10):
yield i
await asyncio.sleep(1)
我怎样才能得到结果的块(假设大小为 3 会产生 [0, 1, 2], [3, 4, 5], [6, 7, 8], [9]) async_generator
这样我就可以写:
async for chunk in get_chunk_it_async(generator(), 3):
print(chunk)
Python 3.6 中的 lack of an aiter()
function 稍微复杂了一点(一旦从 __aiter__
返回一个可等待对象被适当弃用,它将被添加到 3.7 中)。 itertools
对象也没有异步版本。
定义你自己的:
try:
aiter
except NameError:
# not yet a built-in, define our own shim for now
from inspect import iscoroutinefunction as _isasync
def aiter(ob, _isasync=_isasync):
obtype = type(ob) # magic methods are looked up on the type
if not hasattr(obtype, '__aiter__'):
raise TypeError(f'{obtype.__name__!r} object is not async iterable')
async_iter = obtype.__aiter__(ob)
if _isasync(async_iter):
# PEP 492 allowed for __aiter__ to be a coroutine, but 525 reverses this again
raise TypeError(f'{obtype.__name__!r} object is not async iterable')
return async_iter
del _isasync
接下来,您需要定义异步 islice
和 chain
对象:
class achain():
"""Chain over multiple async iterators"""
def __init__(self, *async_iterables):
self._source = iter(async_iterables)
self._active = None
def __aiter__(self):
return self
async def __anext__(self):
if self._source is None:
# we are all done, nothing more to produce
raise StopAsyncIteration
if self._active is None:
# get next async iterable to loop over
ait = next(self._source, None)
if ait is None:
# we are all done, nothing more to produce
self._source = None
raise StopAsyncIteration
self._active = aiter(ait)
try:
return await type(ait).__anext__(ait)
except StopAsyncIteration:
# exhausted, recurse
self._active = None
return await self.__anext__()
class aslice():
"""Slice an async iterator"""
def __init__(self, ait, start, stop=None, step=1):
if stop is None:
start, stop = 0, start
self._ait = ait
self._next, self._stop, self._step = start, stop, step
self._cnt = 0
def __aiter__(self):
return self
async def __anext__(self):
ait, stop = self._ait, self._stop
if ait is None:
raise StopAsyncIteration
anext = type(ait).__anext__
while self._cnt < self._next:
try:
await anext(ait)
except StopAsyncIteration:
self._ait = None
raise
self._cnt += 1
if stop is not None and self._cnt >= stop:
self._ait = None
raise StopAsyncIteration
try:
item = await anext(ait)
except StopAsyncIteration:
self._ait = None
raise
self._cnt += 1
self._next += self._step
return item
有了这些,只需在正确的地方添加 async
:
async def get_chunks_it(l, n):
""" Chunks an async iterator `l` in size `n`
Args:
l (Iterator[Any]): an iterator
n (int): size of
Returns:
Generator[Any]
"""
iterator = aiter(l)
async for first in iterator:
async def afirst():
yield first
yield achain(afirst, aslice(iterator, n - 1))
您可以使用 aiostream.stream.chunks:
from aiostream import stream
async def main():
async for x in stream.chunks(generator(), 3):
print(x)
输出:
[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9]
有关详细信息,请参阅 project page and the documentation。
免责声明:我是项目维护者。
我可以获得迭代器的块,如下所示:
def get_chunks_it(l, n):
""" Chunks an iterator `l` in size `n`
Args:
l (Iterator[Any]): an iterator
n (int): size of
Returns:
Generator[Any]
"""
iterator = iter(l)
for first in iterator:
yield itertools.chain([first], itertools.islice(iterator, n - 1))
现在假设我有一个异步发电机 (python 3.6):
async def generator():
for i in range(0, 10):
yield i
await asyncio.sleep(1)
我怎样才能得到结果的块(假设大小为 3 会产生 [0, 1, 2], [3, 4, 5], [6, 7, 8], [9]) async_generator
这样我就可以写:
async for chunk in get_chunk_it_async(generator(), 3):
print(chunk)
Python 3.6 中的 lack of an aiter()
function 稍微复杂了一点(一旦从 __aiter__
返回一个可等待对象被适当弃用,它将被添加到 3.7 中)。 itertools
对象也没有异步版本。
定义你自己的:
try:
aiter
except NameError:
# not yet a built-in, define our own shim for now
from inspect import iscoroutinefunction as _isasync
def aiter(ob, _isasync=_isasync):
obtype = type(ob) # magic methods are looked up on the type
if not hasattr(obtype, '__aiter__'):
raise TypeError(f'{obtype.__name__!r} object is not async iterable')
async_iter = obtype.__aiter__(ob)
if _isasync(async_iter):
# PEP 492 allowed for __aiter__ to be a coroutine, but 525 reverses this again
raise TypeError(f'{obtype.__name__!r} object is not async iterable')
return async_iter
del _isasync
接下来,您需要定义异步 islice
和 chain
对象:
class achain():
"""Chain over multiple async iterators"""
def __init__(self, *async_iterables):
self._source = iter(async_iterables)
self._active = None
def __aiter__(self):
return self
async def __anext__(self):
if self._source is None:
# we are all done, nothing more to produce
raise StopAsyncIteration
if self._active is None:
# get next async iterable to loop over
ait = next(self._source, None)
if ait is None:
# we are all done, nothing more to produce
self._source = None
raise StopAsyncIteration
self._active = aiter(ait)
try:
return await type(ait).__anext__(ait)
except StopAsyncIteration:
# exhausted, recurse
self._active = None
return await self.__anext__()
class aslice():
"""Slice an async iterator"""
def __init__(self, ait, start, stop=None, step=1):
if stop is None:
start, stop = 0, start
self._ait = ait
self._next, self._stop, self._step = start, stop, step
self._cnt = 0
def __aiter__(self):
return self
async def __anext__(self):
ait, stop = self._ait, self._stop
if ait is None:
raise StopAsyncIteration
anext = type(ait).__anext__
while self._cnt < self._next:
try:
await anext(ait)
except StopAsyncIteration:
self._ait = None
raise
self._cnt += 1
if stop is not None and self._cnt >= stop:
self._ait = None
raise StopAsyncIteration
try:
item = await anext(ait)
except StopAsyncIteration:
self._ait = None
raise
self._cnt += 1
self._next += self._step
return item
有了这些,只需在正确的地方添加 async
:
async def get_chunks_it(l, n):
""" Chunks an async iterator `l` in size `n`
Args:
l (Iterator[Any]): an iterator
n (int): size of
Returns:
Generator[Any]
"""
iterator = aiter(l)
async for first in iterator:
async def afirst():
yield first
yield achain(afirst, aslice(iterator, n - 1))
您可以使用 aiostream.stream.chunks:
from aiostream import stream
async def main():
async for x in stream.chunks(generator(), 3):
print(x)
输出:
[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9]
有关详细信息,请参阅 project page and the documentation。
免责声明:我是项目维护者。