在aiohttp应用进程中监听ZeroMQ
Listen to ZeroMQ in aiohttp application process
我 运行 aiohttp
应用 Gunicorn
在 nginx
后面。
在我的应用程序的初始化模块中,我没有 运行 使用 web.run_app(app)
的应用程序,而是创建一个实例,该实例将由 Gunicorn
导入到 运行 它在每个工作人员中 Gunicorn
创建。
因此 Gunicorn
创建一些工作进程,在其中创建事件循环,然后 runs 在这些循环中创建应用程序的请求处理程序。
我的 aiohttp
应用程序有一组已连接的 WebSockets
(移动应用程序客户端),我想在 Gunicorn
启动的任何应用程序进程中发生事件时通知它们。
我想通知 所有 WebSockets
连接到 所有应用程序进程 。
因此,我使用 ZeroMQ
创建了某种上游代理,我想使用来自每个应用程序进程的 zmq.SUB
套接字订阅它。
...所以基本上我想在每个应用程序工作人员中实现这样的事情:
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://localhost:5555')
while True:
event = socket.recv()
for ws in app['websockets']:
ws.send_bytes(event)
# break before app shutdown. How?
如何在 aiohttp
应用程序中监听 ZeroMQ
代理以将消息转发给 WebSockets
?
我可以在事件循环的背景中将此代码放在 运行 的什么位置,以及如何 运行 并在 aiohttp
应用程序的生命周期内正确关闭它?
更新
我已经在 aiohttp 的 GitHub 存储库中创建了一个 issue 来描述问题并提出可能的解决方案。我非常感谢在这里或那里就所描述的问题提供意见。
好的,关于这个 issue 的问题和讨论导致了我为 aiohttp
贡献的新功能,即版本 1.0我们将能够使用 Application.on_startup()
方法注册 on_startup
应用程序信号。
Documentation.
Working example on the master branch.
#!/usr/bin/env python3
"""Example of aiohttp.web.Application.on_startup signal handler"""
import asyncio
import aioredis
from aiohttp.web import Application, WebSocketResponse, run_app
async def websocket_handler(request):
ws = WebSocketResponse()
await ws.prepare(request)
request.app['websockets'].append(ws)
try:
async for msg in ws:
print(msg)
await asyncio.sleep(1)
finally:
request.app['websockets'].remove(ws)
return ws
async def on_shutdown(app):
for ws in app['websockets']:
await ws.close(code=999, message='Server shutdown')
async def listen_to_redis(app):
try:
sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop)
ch, *_ = await sub.subscribe('news')
async for msg in ch.iter(encoding='utf-8'):
# Forward message to all connected websockets:
for ws in app['websockets']:
ws.send_str('{}: {}'.format(ch.name, msg))
print("message in {}: {}".format(ch.name, msg))
except asyncio.CancelledError:
pass
finally:
print('Cancel Redis listener: close connection...')
await sub.unsubscribe(ch.name)
await sub.quit()
print('Redis connection closed.')
async def start_background_tasks(app):
app['redis_listener'] = app.loop.create_task(listen_to_redis(app))
async def cleanup_background_tasks(app):
print('cleanup background tasks...')
app['redis_listener'].cancel()
await app['redis_listener']
async def init(loop):
app = Application(loop=loop)
app['websockets'] = []
app.router.add_get('/news', websocket_handler)
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
app.on_shutdown.append(on_shutdown)
return app
loop = asyncio.get_event_loop()
app = loop.run_until_complete(init(loop))
run_app(app)
我 运行 aiohttp
应用 Gunicorn
在 nginx
后面。
在我的应用程序的初始化模块中,我没有 运行 使用 web.run_app(app)
的应用程序,而是创建一个实例,该实例将由 Gunicorn
导入到 运行 它在每个工作人员中 Gunicorn
创建。
因此 Gunicorn
创建一些工作进程,在其中创建事件循环,然后 runs 在这些循环中创建应用程序的请求处理程序。
我的 aiohttp
应用程序有一组已连接的 WebSockets
(移动应用程序客户端),我想在 Gunicorn
启动的任何应用程序进程中发生事件时通知它们。
我想通知 所有 WebSockets
连接到 所有应用程序进程 。
因此,我使用 ZeroMQ
创建了某种上游代理,我想使用来自每个应用程序进程的 zmq.SUB
套接字订阅它。
...所以基本上我想在每个应用程序工作人员中实现这样的事情:
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://localhost:5555')
while True:
event = socket.recv()
for ws in app['websockets']:
ws.send_bytes(event)
# break before app shutdown. How?
如何在 aiohttp
应用程序中监听 ZeroMQ
代理以将消息转发给 WebSockets
?
我可以在事件循环的背景中将此代码放在 运行 的什么位置,以及如何 运行 并在 aiohttp
应用程序的生命周期内正确关闭它?
更新
我已经在 aiohttp 的 GitHub 存储库中创建了一个 issue 来描述问题并提出可能的解决方案。我非常感谢在这里或那里就所描述的问题提供意见。
好的,关于这个 issue 的问题和讨论导致了我为 aiohttp
贡献的新功能,即版本 1.0我们将能够使用 Application.on_startup()
方法注册 on_startup
应用程序信号。
Documentation.
Working example on the master branch.
#!/usr/bin/env python3
"""Example of aiohttp.web.Application.on_startup signal handler"""
import asyncio
import aioredis
from aiohttp.web import Application, WebSocketResponse, run_app
async def websocket_handler(request):
ws = WebSocketResponse()
await ws.prepare(request)
request.app['websockets'].append(ws)
try:
async for msg in ws:
print(msg)
await asyncio.sleep(1)
finally:
request.app['websockets'].remove(ws)
return ws
async def on_shutdown(app):
for ws in app['websockets']:
await ws.close(code=999, message='Server shutdown')
async def listen_to_redis(app):
try:
sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop)
ch, *_ = await sub.subscribe('news')
async for msg in ch.iter(encoding='utf-8'):
# Forward message to all connected websockets:
for ws in app['websockets']:
ws.send_str('{}: {}'.format(ch.name, msg))
print("message in {}: {}".format(ch.name, msg))
except asyncio.CancelledError:
pass
finally:
print('Cancel Redis listener: close connection...')
await sub.unsubscribe(ch.name)
await sub.quit()
print('Redis connection closed.')
async def start_background_tasks(app):
app['redis_listener'] = app.loop.create_task(listen_to_redis(app))
async def cleanup_background_tasks(app):
print('cleanup background tasks...')
app['redis_listener'].cancel()
await app['redis_listener']
async def init(loop):
app = Application(loop=loop)
app['websockets'] = []
app.router.add_get('/news', websocket_handler)
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
app.on_shutdown.append(on_shutdown)
return app
loop = asyncio.get_event_loop()
app = loop.run_until_complete(init(loop))
run_app(app)