如何异步处理异步生成器
How to asynchronously process an async generator
我有一个不断提交新数据的生成器。我想在这项工作一进来就安排它。但是到目前为止,我似乎无法找到一种方法来使用 async for 而不等待每个循环。下面是一个简化的例子,包括一个虚拟 async_generator
,我使用的那个来自外部库 (aiokafka.AIOKafkaConsumer
)。
import asyncio
async def async_generator():
"""only an example, really a kafka consumer"""
await asyncio.sleep(0.1)
print("submitted record")
yield 42
await asyncio.sleep(0.1)
print("submitted record")
yield 43
async def process_record():
await asyncio.sleep(1)
print("record processed")
async def my_f():
async for _ in async_generator():
await process_record()
print("record processing submitted")
if __name__ == "__main__":
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(my_f())
我希望它的输出是:
submitted_record
record processing submitted
submitted_record
record processing submitted
record processed
record processed
但是我得到了以下结果,因为我必须等待 process_record()
submitted record
record processed
record processing submitted
submitted record
record processed
record processing submitted
我认为我不能使用 asyncio.gather
,因为 async_generator
会不断产生数据,因此无法到达 async for
之后的收集。
我弄错了 asyncio.create_task
的工作原理,它启动任务但不等待它完成。这意味着您可以启动多个可以在后台完成的任务,但是如果生成器最后完成了 await gather
语句,请确保所有已启动的任务都已完成,然后再继续。
因此,我想要的行为可以通过在 async for
中创建任务列表并在之后调用 gather
来实现。这是一个工作示例:
import asyncio
async def async_generator():
for x in range(10):
await asyncio.sleep(0.5)
print(f"{x} - submitted record")
yield x
async def process_record(x):
await asyncio.sleep(1)
print(f"{x} - record processed")
async def my_f():
tasks = []
async for x in async_generator():
tasks.append(asyncio.create_task(process_record(x)))
print(f"{x} - record processing submitted")
await asyncio.gather(*tasks)
if __name__ == "__main__":
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(my_f())
前几行输出如下。请注意,每条记录都是在下一条记录已提交后才处理的。
0 - submitted record
0 - record processing submitted
1 - submitted record
1 - record processing submitted
0 - record processed
2 - submitted record
2 - record processing submitted
1 - record processed
我有一个不断提交新数据的生成器。我想在这项工作一进来就安排它。但是到目前为止,我似乎无法找到一种方法来使用 async for 而不等待每个循环。下面是一个简化的例子,包括一个虚拟 async_generator
,我使用的那个来自外部库 (aiokafka.AIOKafkaConsumer
)。
import asyncio
async def async_generator():
"""only an example, really a kafka consumer"""
await asyncio.sleep(0.1)
print("submitted record")
yield 42
await asyncio.sleep(0.1)
print("submitted record")
yield 43
async def process_record():
await asyncio.sleep(1)
print("record processed")
async def my_f():
async for _ in async_generator():
await process_record()
print("record processing submitted")
if __name__ == "__main__":
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(my_f())
我希望它的输出是:
submitted_record
record processing submitted
submitted_record
record processing submitted
record processed
record processed
但是我得到了以下结果,因为我必须等待 process_record()
submitted record
record processed
record processing submitted
submitted record
record processed
record processing submitted
我认为我不能使用 asyncio.gather
,因为 async_generator
会不断产生数据,因此无法到达 async for
之后的收集。
我弄错了 asyncio.create_task
的工作原理,它启动任务但不等待它完成。这意味着您可以启动多个可以在后台完成的任务,但是如果生成器最后完成了 await gather
语句,请确保所有已启动的任务都已完成,然后再继续。
因此,我想要的行为可以通过在 async for
中创建任务列表并在之后调用 gather
来实现。这是一个工作示例:
import asyncio
async def async_generator():
for x in range(10):
await asyncio.sleep(0.5)
print(f"{x} - submitted record")
yield x
async def process_record(x):
await asyncio.sleep(1)
print(f"{x} - record processed")
async def my_f():
tasks = []
async for x in async_generator():
tasks.append(asyncio.create_task(process_record(x)))
print(f"{x} - record processing submitted")
await asyncio.gather(*tasks)
if __name__ == "__main__":
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(my_f())
前几行输出如下。请注意,每条记录都是在下一条记录已提交后才处理的。
0 - submitted record
0 - record processing submitted
1 - submitted record
1 - record processing submitted
0 - record processed
2 - submitted record
2 - record processing submitted
1 - record processed