用于组合异步迭代器的映射、过滤器和 itertools

Map, filter and itertools for composing asynchronous iterators

Python 是否支持异步迭代器上的函数式操作?我知道我可以使用 mapfilteritertools 来延迟转换和使用来自普通生成器的数据:

from itertools import accumulate, takewhile

def generator():
    a, b = 1, 1
    while True:
        yield a
        a, b = b, a + b

# create another iterator, no computation is started yet:
another_iterator = takewhile(lambda x: x < 100, accumulate(generator()))
# start consuming data:
print(list(another_iterator))
# [1, 2, 4, 7, 12, 20, 33, 54, 88]

现在,Python 3.6 的异步 generators/iterators 不支持同样的事情,因为它们当然没有实现正常的迭代器协议:

async def agenerator():
    a, b = 1, 1
    while True:
        yield a
        a, b = b, a + b

accumulate(agenerator())

TypeError: 'async_generator' object is not iterable

是否有某种异步映射或异步 itertools 可以在 Python 3.6/3.7 中实现类似的惰性行为?

我看到的最完整的 itertools 异步版本是 aiostream 模块。您的示例将是:

import asyncio
from aiostream.stream import takewhile, accumulate, list as alist


async def agenerator():
    a, b = 1, 1
    while True:
        yield a
        a, b = b, a + b


async def main():
    another_iterator = takewhile(
        accumulate(agenerator()),
        lambda x: x < 100, 
    )

    res = await alist(another_iterator)

    print(res)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()