暂停 Python 个异步协程
Pausing Python asyncio coroutines
由于我的项目严重依赖异步网络 I/O,我总是不得不预料到会发生一些奇怪的网络错误:是否是我正在连接的服务出现 API 中断,或者我自己的服务器有网络问题,或者其他问题。出现这样的问题,并且没有真正的解决方法。因此,我最终最终试图找到一种方法来有效 "pause" 协程在发生此类网络问题时从外部执行,直到重新建立连接。我的方法是编写一个带有参数 pause
的装饰器 pausable
,它是一个协程函数,它将被 yield
ed from
/ await
ed 像这样:
def pausable(pause, resume_check=None, delay_start=None):
if not asyncio.iscoroutinefunction(pause):
raise TypeError("pause must be a coroutine function")
if not (delay_start is None or asyncio.iscoroutinefunction(delay_start)):
raise TypeError("delay_start must be a coroutine function")
def wrapper(coro):
@asyncio.coroutine
def wrapped(*args, **kwargs):
if delay_start is not None:
yield from delay_start()
for x in coro(*args, **kwargs):
try:
yield from pause()
yield x
# catch exceptions the regular discord.py user might not catch
except (asyncio.CancelledError,
aiohttp.ClientError,
websockets.WebSocketProtocolError,
ConnectionClosed,
# bunch of other network errors
) as ex:
if any((resume_check() if resume_check is not None else False and
isinstance(ex, asyncio.CancelledError),
# clean disconnect
isinstance(ex, ConnectionClosed) and ex.code == 1000,
# connection issue
not isinstance(ex, ConnectionClosed))):
yield from pause()
yield x
else:
raise
return wrapped
return wrapper
特别注意这一点:
for x in coro(*args, **kwargs):
yield from pause()
yield x
用法示例(ready
是一个 asyncio.Event
):
@pausable(ready.wait, resume_check=restarting_enabled, delay_start=ready.wait)
@asyncio.coroutine
def send_test_every_minute():
while True:
yield from client.send("Test")
yield from asyncio.sleep(60)
但是,这似乎不起作用,而且对我来说似乎不是一个优雅的解决方案。是否有与 Python 3.5.3 及更高版本兼容的工作解决方案?希望与 Python 3.4.4 及更高版本兼容。
附录
仅仅 try
/except
ing 在需要暂停的协程中引发的异常既不总是可行的,也不是可行的选择,因为它严重违反了核心代码设计原则( DRY) 我愿意遵守;换句话说,在这么多协程函数中排除这么多异常会使我的代码变得混乱。
关于当前解决方案的几句话。
for x in coro(*args, **kwargs):
try:
yield from pause()
yield x
except
...
您将无法通过这种方式捕获异常:
- for 循环外引发异常
- 生成器在第一个异常之后已经耗尽(不可用)
.
@asyncio.coroutine
def test():
yield from asyncio.sleep(1)
raise RuntimeError()
yield from asyncio.sleep(1)
print('ok')
@asyncio.coroutine
def main():
coro = test()
try:
for x in coro:
try:
yield x
except Exception:
print('Exception is NOT here.')
except Exception:
print('Exception is here.')
try:
next(coro)
except StopIteration:
print('And after first exception generator is exhausted.')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
输出:
Exception is here.
And after first exception generator is exhausted.
即使可以恢复,考虑一下如果协程由于异常已经做了一些清理操作会发生什么。
鉴于以上所有情况,如果某些协程引发异常,您唯一的选择是抑制此异常(如果需要)并重新 运行 此协程。如果你愿意,你可以在一些事件之后重新运行它。像这样:
def restart(ready_to_restart):
def wrapper(func):
@asyncio.coroutine
def wrapped(*args, **kwargs):
while True:
try:
return (yield from func(*args, **kwargs))
except (ConnectionClosed,
aiohttp.ClientError,
websockets.WebSocketProtocolError,
ConnectionClosed,
# bunch of other network errors
) as ex:
yield from ready_to_restart.wait()
ready_to_restart = asyncio.Event() # set it when you sure network is fine
# and you're ready to restart
更新
However, how would I make the coroutine continue where it was
interrupted now?
澄清一下:
@asyncio.coroutine
def test():
with aiohttp.ClientSession() as client:
yield from client.request_1()
# STEP 1:
# Let's say line above raises error
# STEP 2:
# Imagine you you somehow maged to return to this place
# after exception above to resume execution.
# But what is state of 'client' now?
# It's was freed by context manager when we left coroutine.
yield from client.request_2()
无论是函数还是协程,都不是为了在异常从它们传播到外部后恢复执行而设计的。
唯一想到的是将复杂操作拆分为可重新启动的小操作,而整个复杂操作可以存储其状态:
@asyncio.coroutine
def complex_operation():
with aiohttp.ClientSession() as client:
res = yield from step_1(client)
# res/client - is a state of complex_operation.
# It can be used by re-startable steps.
res = yield from step_2(client, res)
@restart(ready_to_restart)
@asyncio.coroutine
def step_1():
# ...
@restart(ready_to_restart)
@asyncio.coroutine
def step_2():
# ...
由于我的项目严重依赖异步网络 I/O,我总是不得不预料到会发生一些奇怪的网络错误:是否是我正在连接的服务出现 API 中断,或者我自己的服务器有网络问题,或者其他问题。出现这样的问题,并且没有真正的解决方法。因此,我最终最终试图找到一种方法来有效 "pause" 协程在发生此类网络问题时从外部执行,直到重新建立连接。我的方法是编写一个带有参数 pause
的装饰器 pausable
,它是一个协程函数,它将被 yield
ed from
/ await
ed 像这样:
def pausable(pause, resume_check=None, delay_start=None):
if not asyncio.iscoroutinefunction(pause):
raise TypeError("pause must be a coroutine function")
if not (delay_start is None or asyncio.iscoroutinefunction(delay_start)):
raise TypeError("delay_start must be a coroutine function")
def wrapper(coro):
@asyncio.coroutine
def wrapped(*args, **kwargs):
if delay_start is not None:
yield from delay_start()
for x in coro(*args, **kwargs):
try:
yield from pause()
yield x
# catch exceptions the regular discord.py user might not catch
except (asyncio.CancelledError,
aiohttp.ClientError,
websockets.WebSocketProtocolError,
ConnectionClosed,
# bunch of other network errors
) as ex:
if any((resume_check() if resume_check is not None else False and
isinstance(ex, asyncio.CancelledError),
# clean disconnect
isinstance(ex, ConnectionClosed) and ex.code == 1000,
# connection issue
not isinstance(ex, ConnectionClosed))):
yield from pause()
yield x
else:
raise
return wrapped
return wrapper
特别注意这一点:
for x in coro(*args, **kwargs):
yield from pause()
yield x
用法示例(ready
是一个 asyncio.Event
):
@pausable(ready.wait, resume_check=restarting_enabled, delay_start=ready.wait)
@asyncio.coroutine
def send_test_every_minute():
while True:
yield from client.send("Test")
yield from asyncio.sleep(60)
但是,这似乎不起作用,而且对我来说似乎不是一个优雅的解决方案。是否有与 Python 3.5.3 及更高版本兼容的工作解决方案?希望与 Python 3.4.4 及更高版本兼容。
附录
仅仅 try
/except
ing 在需要暂停的协程中引发的异常既不总是可行的,也不是可行的选择,因为它严重违反了核心代码设计原则( DRY) 我愿意遵守;换句话说,在这么多协程函数中排除这么多异常会使我的代码变得混乱。
关于当前解决方案的几句话。
for x in coro(*args, **kwargs):
try:
yield from pause()
yield x
except
...
您将无法通过这种方式捕获异常:
- for 循环外引发异常
- 生成器在第一个异常之后已经耗尽(不可用)
.
@asyncio.coroutine
def test():
yield from asyncio.sleep(1)
raise RuntimeError()
yield from asyncio.sleep(1)
print('ok')
@asyncio.coroutine
def main():
coro = test()
try:
for x in coro:
try:
yield x
except Exception:
print('Exception is NOT here.')
except Exception:
print('Exception is here.')
try:
next(coro)
except StopIteration:
print('And after first exception generator is exhausted.')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
输出:
Exception is here.
And after first exception generator is exhausted.
即使可以恢复,考虑一下如果协程由于异常已经做了一些清理操作会发生什么。
鉴于以上所有情况,如果某些协程引发异常,您唯一的选择是抑制此异常(如果需要)并重新 运行 此协程。如果你愿意,你可以在一些事件之后重新运行它。像这样:
def restart(ready_to_restart):
def wrapper(func):
@asyncio.coroutine
def wrapped(*args, **kwargs):
while True:
try:
return (yield from func(*args, **kwargs))
except (ConnectionClosed,
aiohttp.ClientError,
websockets.WebSocketProtocolError,
ConnectionClosed,
# bunch of other network errors
) as ex:
yield from ready_to_restart.wait()
ready_to_restart = asyncio.Event() # set it when you sure network is fine
# and you're ready to restart
更新
However, how would I make the coroutine continue where it was interrupted now?
澄清一下:
@asyncio.coroutine
def test():
with aiohttp.ClientSession() as client:
yield from client.request_1()
# STEP 1:
# Let's say line above raises error
# STEP 2:
# Imagine you you somehow maged to return to this place
# after exception above to resume execution.
# But what is state of 'client' now?
# It's was freed by context manager when we left coroutine.
yield from client.request_2()
无论是函数还是协程,都不是为了在异常从它们传播到外部后恢复执行而设计的。
唯一想到的是将复杂操作拆分为可重新启动的小操作,而整个复杂操作可以存储其状态:
@asyncio.coroutine
def complex_operation():
with aiohttp.ClientSession() as client:
res = yield from step_1(client)
# res/client - is a state of complex_operation.
# It can be used by re-startable steps.
res = yield from step_2(client, res)
@restart(ready_to_restart)
@asyncio.coroutine
def step_1():
# ...
@restart(ready_to_restart)
@asyncio.coroutine
def step_2():
# ...