信号在 aiohttp 中不起作用
Signals don't work in aiohttp
我在 aiohttp
上编写了一个简单的异步应用程序。我需要在服务器启动或关闭时扩展 app
实例,但信号根本不起作用(该函数永远不会执行):
from aiohttp import web
app = web.Application()
async def on_startup(app):
app['key'] = "need to save something here"
app.on_startup.append(on_startup)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
srv = loop.run_until_complete(asyncio.gather(
loop.create_server(app.make_handler(), host='0.0.0.0', port=8000)
))
loop.run_forever()
如何通过回调扩展 app
实例?有人有想法吗?
P.S。我使用稳定的 aiohttp
版本 (3.0.9).
如果没有理由像 make_handler()
那样使用 low-level API,请尝试以下方法,它将与 on_startup
信号一起使用。
if __name__ == "__main__":
web.run_app(app, host='0.0.0.0', port=8000)
run_app()
将在其使用的循环内部使用 get_event_loop()
。
我花了一些时间寻找解决方案...我找到了!我决定探索 web.run_app()
方法以了解其工作原理。因此,此方法在 运行 之前使用 AppRunner().setup()
配置应用程序。我不确定这是不是最好的解决方案,但它确实有效 :) 好吧,最终代码如下所示:
from aiohttp import web
app = web.Application()
async def on_startup(app):
app['key'] = "need to save something here"
app.on_startup.append(on_startup)
# Create an instance of the application runner
runner = web.AppRunner(app, handle_signals=True)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
srv = loop.run_until_complete(asyncio.gather(
runner.setup(), # Configure the application before run the server
loop.create_server(app.make_handler(), host='0.0.0.0', port=8000)
))
loop.run_forever()
我在 aiohttp
上编写了一个简单的异步应用程序。我需要在服务器启动或关闭时扩展 app
实例,但信号根本不起作用(该函数永远不会执行):
from aiohttp import web
app = web.Application()
async def on_startup(app):
app['key'] = "need to save something here"
app.on_startup.append(on_startup)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
srv = loop.run_until_complete(asyncio.gather(
loop.create_server(app.make_handler(), host='0.0.0.0', port=8000)
))
loop.run_forever()
如何通过回调扩展 app
实例?有人有想法吗?
P.S。我使用稳定的 aiohttp
版本 (3.0.9).
如果没有理由像 make_handler()
那样使用 low-level API,请尝试以下方法,它将与 on_startup
信号一起使用。
if __name__ == "__main__":
web.run_app(app, host='0.0.0.0', port=8000)
run_app()
将在其使用的循环内部使用 get_event_loop()
。
我花了一些时间寻找解决方案...我找到了!我决定探索 web.run_app()
方法以了解其工作原理。因此,此方法在 运行 之前使用 AppRunner().setup()
配置应用程序。我不确定这是不是最好的解决方案,但它确实有效 :) 好吧,最终代码如下所示:
from aiohttp import web
app = web.Application()
async def on_startup(app):
app['key'] = "need to save something here"
app.on_startup.append(on_startup)
# Create an instance of the application runner
runner = web.AppRunner(app, handle_signals=True)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
srv = loop.run_until_complete(asyncio.gather(
runner.setup(), # Configure the application before run the server
loop.create_server(app.make_handler(), host='0.0.0.0', port=8000)
))
loop.run_forever()