aiohttp 中的超时 WebSocket 连接

Timeout WebSocket connections in aiohttp

我的 WebSocket 服务器实现是对世界开放的,但客户端需要在连接建立后发送身份验证消息,否则服务器应关闭连接。

如何在 aiohttp 中实现它?看来,我需要做以下事情:

  1. 为每个套接字连接创建一个 on_open 方法:我找不到创建此类事件的方法(类似于 Tornado 中的 on_open)。

  2. 创建定时器:可以使用主事件循环的asyncio的sleepcall_back方法。但是我找不到将 WebSocketResponse 发送到回调函数的方法:

    await asyncio.sleep(10, timer, loop=request.app.loop)

  3. 如果未通过身份验证则关闭连接

这是我之前使用 Tornado 的结果:

def open(self, *args, **kwargs):
    self.timeout = ioloop.IOLoop.instance().add_timeout(
        datetime.timedelta(seconds=60),
        self._close_on_timeout
    )

def remove_timeout_timer(self):
    ioloop.IOLoop.instance().remove_timeout(self.timeout)
    self.timeout = None

def on_message(self, message):
    if message = 'AUTHENTICATE':
        self.authenticated = True
        self.remove_timeout_timer

def _close_on_timeout(self):
    if not self.authenticated:
        if self.ws_connection:
            self.close()

这是我使用 aiohttp 设置计时器的方法:

async def ensure_client_logged(ws):
    await asyncio.sleep(3)  # wait 3 seconds
    await ws.send_str('hello')

async def ws_handler(request):
    ws = web.WebSocketResponse()

    asyncio.ensure_future(ensure_client_logged(ws), loop=request.app.loop)

但代码是运行阻塞方式,这意味着服务器在休眠时变得无响应。

有人能给我指出正确的方向吗?

您需要为身份验证程序设定截止日期。 asyncio.wait_for 是一种方便的方法:

async def ws_handler(request):
    loop = asyncio.get_event_loop()
    ws = web.WebSocketResponse()
    loop.create_task(handle_client(ws))

async def handle_client(ws):
    try:
        authenticated = await asyncio.wait_for(_authenticate(ws), 10)
    except asyncio.TimeoutError:
        authenticated = False
    if not authenticated:
        ws.close()
        return
    # continue talking to the client

async def _authenticate(ws):
    # implement authentication here, without worrying about
    # timeout - the coroutine will be automatically canceled
    # once the timeout elapses
    ...
    return True  # if successfully authenticated

这是一个完整的工作示例,有利于未来的用户:

from aiohttp import web
import asyncio

async def wait_for_authentication(ws, app):
    async for msg in ws:
        if msg.type == web.WSMsgType.TEXT and msg.data == 'AUTHENTICATE':  # Implement your own authentication
            await ws.send_str('WELCOME')
            return True
        else:
            await ws.send_str('NOT AUTHENTICATED')


async def authenticate(ws, app) -> bool:
    try:
        authenticated = await asyncio.wait_for(wait_for_authentication(ws, app), 5)
    except asyncio.TimeoutError:
        authenticated = False

    if not authenticated:
        await ws.send_str('The AUTHENTICATE command was not received. Closing the connection...')
        await ws.close()
        return False


async def ws_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    await request.app.loop.create_task(authenticate(ws, request.app))

    async for msg in ws:
        if msg.type != web.WSMsgType.TEXT:
            continue

        await ws.send_str(msg.data)

def init():
    app = web.Application()

    app.router.add_get('/', ws_handler)

    return app

web.run_app(init())