如何异步处理异步生成器

How to asynchronously process an async generator

我有一个不断提交新数据的生成器。我想在这项工作一进来就安排它。但是到目前为止,我似乎无法找到一种方法来使用 async for 而不等待每个循环。下面是一个简化的例子,包括一个虚拟 async_generator,我使用的那个来自外部库 (aiokafka.AIOKafkaConsumer)。

import asyncio


async def async_generator():
    """only an example, really a kafka consumer"""
    await asyncio.sleep(0.1)
    print("submitted record")
    yield 42
    await asyncio.sleep(0.1)
    print("submitted record")
    yield 43


async def process_record():
    await asyncio.sleep(1)
    print("record processed")


async def my_f():
    async for _ in async_generator():
        await process_record()
        print("record processing submitted")


if __name__ == "__main__":
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(my_f())

我希望它的输出是:

submitted_record
record processing submitted
submitted_record
record processing submitted
record processed
record processed

但是我得到了以下结果,因为我必须等待 process_record()

submitted record
record processed
record processing submitted
submitted record
record processed
record processing submitted

我认为我不能使用 asyncio.gather,因为 async_generator 会不断产生数据,因此无法到达 async for 之后的收集。

我弄错了 asyncio.create_task 的工作原理,它启动任务但不等待它完成。这意味着您可以启动多个可以在后台完成的任务,但是如果生成器最后完成了 await gather 语句,请确保所有已启动的任务都已完成,然后再继续。

因此,我想要的行为可以通过在 async for 中创建任务列表并在之后调用 gather 来实现。这是一个工作示例:

import asyncio


async def async_generator():
    for x in range(10):
        await asyncio.sleep(0.5)
        print(f"{x} - submitted record")
        yield x


async def process_record(x):
    await asyncio.sleep(1)
    print(f"{x} - record processed")


async def my_f():
    tasks = []
    async for x in async_generator():
        tasks.append(asyncio.create_task(process_record(x)))
        print(f"{x} - record processing submitted")
    await asyncio.gather(*tasks)


if __name__ == "__main__":
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(my_f())

前几行输出如下。请注意,每条记录都是在下一条记录已提交后才处理的。

0 - submitted record
0 - record processing submitted
1 - submitted record
1 - record processing submitted
0 - record processed
2 - submitted record
2 - record processing submitted
1 - record processed