暂停 Python 个异步协程

Pausing Python asyncio coroutines

由于我的项目严重依赖异步网络 I/O,我总是不得不预料到会发生一些奇怪的网络错误:是否是我正在连接的服务出现 API 中断,或者我自己的服务器有网络问题,或者其他问题。出现这样的问题,并且没有真正的解决方法。因此,我最终最终试图找到一种方法来有效 "pause" 协程在发生此类网络问题时从外部执行,直到重新建立连接。我的方法是编写一个带有参数 pause 的装饰器 pausable,它是一个协程函数,它将被 yielded from / awaited 像这样:

def pausable(pause, resume_check=None, delay_start=None):
    if not asyncio.iscoroutinefunction(pause):
        raise TypeError("pause must be a coroutine function")
    if not (delay_start is None or asyncio.iscoroutinefunction(delay_start)):
        raise TypeError("delay_start must be a coroutine function")

    def wrapper(coro):
        @asyncio.coroutine
        def wrapped(*args, **kwargs):
            if delay_start is not None:
                yield from delay_start()
            for x in coro(*args, **kwargs):
                try:
                    yield from pause()
                    yield x
                # catch exceptions the regular discord.py user might not catch
                except (asyncio.CancelledError,
                        aiohttp.ClientError,
                        websockets.WebSocketProtocolError,
                        ConnectionClosed,
                        # bunch of other network errors
                        ) as ex:
                    if any((resume_check() if resume_check is not None else False and
                            isinstance(ex, asyncio.CancelledError),
                            # clean disconnect
                            isinstance(ex, ConnectionClosed) and ex.code == 1000,
                            # connection issue
                            not isinstance(ex, ConnectionClosed))):
                        yield from pause()
                        yield x
                    else:
                        raise

        return wrapped
    return wrapper

特别注意这一点:

for x in coro(*args, **kwargs):
    yield from pause()
    yield x

用法示例(ready 是一个 asyncio.Event):

@pausable(ready.wait, resume_check=restarting_enabled, delay_start=ready.wait)
@asyncio.coroutine
def send_test_every_minute():
    while True:
        yield from client.send("Test")
        yield from asyncio.sleep(60)

但是,这似乎不起作用,而且对我来说似乎不是一个优雅的解决方案。是否有与 Python 3.5.3 及更高版本兼容的工作解决方案?希望与 Python 3.4.4 及更高版本兼容。

附录

仅仅 try/excepting 在需要暂停的协程中引发的异常既不总是可行的,也不是可行的选择,因为它严重违反了核心代码设计原则( DRY) 我愿意遵守;换句话说,在这么多协程函数中排除这么多异常会使我的代码变得混乱。

关于当前解决方案的几句话。

for x in coro(*args, **kwargs):
    try:
        yield from pause()
        yield x
    except
        ...

您将无法通过这种方式捕获异常:

  • for 循环外引发异常
  • 生成器在第一个异常之后已经耗尽(不可用)

.

@asyncio.coroutine
def test():
    yield from asyncio.sleep(1)
    raise RuntimeError()
    yield from asyncio.sleep(1)
    print('ok')



@asyncio.coroutine
def main():
    coro = test()
    try:
        for x in coro:
            try:
                yield x
            except Exception:
                print('Exception is NOT here.')
    except Exception:
        print('Exception is here.')

        try:
            next(coro)
        except StopIteration:
            print('And after first exception generator is exhausted.')


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

输出:

Exception is here.
And after first exception generator is exhausted.

即使可以恢复,考虑一下如果协程由于异常已经做了一些清理操作会发生什么。


鉴于以上所有情况,如果某些协程引发异常,您唯一的选择是抑制此异常(如果需要)并重新 运行 此协程。如果你愿意,你可以在一些事件之后重新运行它。像这样:

def restart(ready_to_restart):
    def wrapper(func):
        @asyncio.coroutine
        def wrapped(*args, **kwargs):
            while True:
                try:
                    return (yield from func(*args, **kwargs))
                except (ConnectionClosed,
                        aiohttp.ClientError,
                        websockets.WebSocketProtocolError,
                        ConnectionClosed,
                        # bunch of other network errors
                        ) as ex:
                    yield from ready_to_restart.wait()


ready_to_restart = asyncio.Event()  # set it when you sure network is fine 
                                    # and you're ready to restart

更新

However, how would I make the coroutine continue where it was interrupted now?

澄清一下:

@asyncio.coroutine
def test():
    with aiohttp.ClientSession() as client:
        yield from client.request_1()
        # STEP 1:
        # Let's say line above raises error

        # STEP 2:
        # Imagine you you somehow maged to return to this place
        # after exception above to resume execution.
        # But what is state of 'client' now?
        # It's was freed by context manager when we left coroutine.
        yield from client.request_2()

无论是函数还是协程,都不是为了在异常从它们传播到外部后恢复执行而设计的。

唯一想到的是将复杂操作拆分为可重新启动的小操作,而整个复杂操作可以存储其状态:

@asyncio.coroutine
def complex_operation():
    with aiohttp.ClientSession() as client:
        res = yield from step_1(client)

        # res/client - is a state of complex_operation.
        # It can be used by re-startable steps.

        res = yield from step_2(client, res)


@restart(ready_to_restart)
@asyncio.coroutine
def step_1():
    # ...


@restart(ready_to_restart)
@asyncio.coroutine
def step_2():
    # ...