如何将基于回调的异步函数转换为异步生成器

How to convert callback-based async function to async generator

目前可以通过提供将在每个传入连接上触发的回调来启动异步服务器:

async def on_connection(reader, writer):
    # this is invoked each time a new connection is made
    pass

server = await asyncio.start_server(on_connection, host, port)

我想摆脱回调并改用 async for,所以它看起来像这样:

async for reader, writer in my_start_server(host, port):
    # this should be invoked each time a new connection is made
    pass

不幸的是,这似乎并不容易:

async def my_start_server(host, port):
    async def on_connection(reader, writer):
        # here I have to somehow yield (reader, writer) tuple
        # but if I do just `yield (reader, writer)`, it would 
        # make `on_connection` itself a generator, but would 
        # not affect `my_start_server`
        pass

    server = await asyncio.start_server(on_connection, host, port)

我曾考虑过 class 和 __aiter__ 实现,但结果似乎过于复杂了。那么,这是唯一的方法吗,还是我错过了将异步回调转换为异步生成器的任何简单方法?

正如 Vincent 所注意到的,您不应该使用 async for 来获取连接,因为其中的代码会阻止处理其他连接。将每个连接视为单独的任务,应该 运行 而不管其他连接。

下面是如何在 my_start_server 中使用队列 yield 的示例,这也表明我们仍然会 return 某种 on_connection:

async def my_start_server(host, port):
    queue = asyncio.Queue()

    async def put(reader, writer):
        await queue.put((reader, writer,))

    await asyncio.start_server(put, host, port)

    while True:
        yield (await queue.get())

.

async for (reader, writer) in my_start_server(host, port):

    # If we will just handle reader/writer here, others will be suspended from handling.
    # We can avoid it starting some task,
    # but in this case 'handle' would be nothing different from 'on_connection'

    asyncio.Task(handle(reader, writer))