如何正确锁定异步生成器?

How to correctly lock async generators?

我正在尝试使用异步生成器作为共享连接的包装器

async def mygen():
    await init()
    connection = await open_connection()
    while True:
        data = yield
        await connection.send(data)


shared_gen = None

async def send_data(data):
    global shared_gen
    if not shared_gen:
        shared_gen = mygen()
        await shared_gen.asend(None)
    await shared_gen.asend(data)

上面的代码在竞态条件下是安全的吗?两个 asend 是否可以同时执行,或者第二个会隐式阻塞,直到生成器在 yield 步骤中准备就绪?假设 connection.send 不是并发安全的。

更新:

写了一个包装器来帮助安全使用。

class Locked:
    def __init__(self, resource):
        self._resource = resource
        self._lock = asyncio.Lock()

    @contextlib.asynccontextmanager
    async def lock(self):
        async with self._lock:
            yield self._resource

async def send_data(locked_gen, data):
    async with locked_gen.lock() as gen:
        await gen.asend(data)


async def main():
    gen = mygen()
    await gen.asend(None)
    locked_gen = Locked(gen)
    ...

Is it possible for two asends to execute concurrently or the second one will block implicitly until the generator is ready in the yield step?

不可能并发调用 asend,但尝试这样做不会导致阻塞。相反,第二个将引发 RuntimeError,如下例所示:

import asyncio
 
async def gen():
    while True:
        yield
        await asyncio.sleep(1)

async def main():
    ait = gen()
    await ait.asend(None)  # start the generator
    async def send():
        print('sending')
        await ait.asend(42)
    await asyncio.gather(send(), send())
 
asyncio.run(main())

要使发送阻塞直到前一个发送完成,您需要在 asend 的等待周围显式锁定:

async def main():
    ait = gen()
    await ait.asend(None)
    lock = asyncio.Lock()
    async def send():
        async with lock:
            print('sending')
            await ait.asend(42)
    await asyncio.gather(send(), send())