Python 3 websockets - 在关闭连接之前发送消息

Python 3 websockets - send message before closing connection

我是 Stack Overflow 的新手(虽然已经是长期 "stalker"!)所以请对我温柔点!

我正在尝试学习 Python,尤其是使用 websockets 的 Asyncio。

在网上搜索 examples/tutorials 我整理了以下微型聊天应用程序,可以在它变得更庞大(更多命令等)和变得难以重构之前使用一些建议。

我的主要问题是,为什么(在发送 DISCONNECT 命令时)它需要 asyncio.sleep(0) 才能在关闭连接之前发送断开连接验证消息?

除此之外,我的结构是否正确?

我觉得太多了 async/await 但我不太明白为什么。

盯着教程和 S/O 几个小时的帖子在这一点上似乎没有帮助,所以我想我应该直接得到一些专家的建议!

我们开始吧,响应 "nick"、"msg"、"test" 和 "disconnect" 命令的简单 WS 服务器。不需要前缀,即 "nick Rachel".

import asyncio
import websockets
import sys

class ChatServer:

    def __init__(self):
        print("Chat Server Starting..")
        self.Clients = set()
        if sys.platform == 'win32':
            self.loop = asyncio.ProactorEventLoop()
            asyncio.set_event_loop(self.loop)
        else:
            self.loop = asyncio.get_event_loop()

    def run(self):
        start_server = websockets.serve(self.listen, '0.0.0.0', 8080)
        try:
            self.loop.run_until_complete(start_server)
            print("Chat Server Running!")
            self.loop.run_forever()
        except:
            print("Chat Server Error!")

    async def listen(self, websocket, path):

        client = Client(websocket=websocket)
        sender_task = asyncio.ensure_future(self.handle_outgoing_queue(client))

        self.Clients.add(client)
        print("+ connection: " + str(len(self.Clients)))

        while True:
            try:
                msg = await websocket.recv()
                if msg is None:
                    break

                await self.handle_message(client, msg)

            except websockets.exceptions.ConnectionClosed:
                break

        self.Clients.remove(client)
        print("- connection: " + str(len(self.Clients)))

    async def handle_outgoing_queue(self, client):
        while client.websocket.open:
            msg = await client.outbox.get()
            await client.websocket.send(msg)


    async def handle_message(self, client, data):

        strdata = data.split(" ")
        _cmd = strdata[0].lower()

        try:
            # Check to see if the command exists. Otherwise, AttributeError is thrown.
            func = getattr(self, "cmd_" + _cmd)

            try:
                await func(client, param, strdata)
            except IndexError:
                await client.send("Not enough parameters!")

        except AttributeError:
            await client.send("Command '%s' does not exist!" % (_cmd))

    # SERVER COMMANDS

    async def cmd_nick(self, client, param, strdata):
        # This command needs a parameter (with at least one character). If not supplied, IndexError is raised
        # Is there a cleaner way of doing this? Otherwise it'll need to reside within all functions that require a param
        test = param[1][0]


        # If we've reached this point there's definitely a parameter supplied
        client.Nick = param[1]
        await client.send("Your nickname is now %s" % (client.Nick))

    async def cmd_msg(self, client, param, strdata):
        # This command needs a parameter (with at least one character). If not supplied, IndexError is raised
        # Is there a cleaner way of doing this? Otherwise it'll need to reside within all functions that require a param
        test = param[1][0]

        # If we've reached this point there's definitely a parameter supplied
        message = strdata.split(" ",1)[1]

        # Before we proceed, do we have a nickname?
        if client.Nick == None:
            await client.send("You must choose a nickname before sending messages!")
            return

        for each in self.Clients:
            await each.send("%s says: %s" % (client.Nick, message))

    async def cmd_test(self, client, param, strdata):
        # This command doesn't need a parameter, so simply let the client know they issued this command successfully.
        await client.send("Test command reply!")

    async def cmd_disconnect(self, client, param, strdata):
        # This command doesn't need a parameter, so simply let the client know they issued this command successfully.
        await client.send("DISCONNECTING")
        await asyncio.sleep(0)      # If this isn't here we don't receive the "disconnecting" message - just an exception in "handle_outgoing_queue" ?
        await client.websocket.close()


class Client():
    def __init__(self, websocket=None):
        self.websocket = websocket
        self.IPAddress = websocket.remote_address[0]
        self.Port = websocket.remote_address[1]

        self.Nick = None
        self.outbox = asyncio.Queue()

    async def send(self, data):
        await self.outbox.put(data)

chat = ChatServer()
chat.run()

您的代码使用无限大小 Queues,这意味着 .put() 立即调用 .put_nowait() 和 returns。 (如果您确实想在代码中保留这些队列,请考虑在队列中使用 'None' 作为关闭连接并将 client.websocket.close() 移动到 handle_outgoing_queue() 的信号)。

另一个问题:考虑将 for x in seq: await co(x) 替换为 await asyncio.wait([co(x) for x in seq])。尝试使用 asyncio.sleep(1) 来体验显着的差异。

我相信一个更好的选择是删除所有发件箱 Queues 并仅中继内置的异步队列和 ensure_future。 websockets 包已经在其实现中包含队列。

我想指出的是,websockets 的作者在 2017 年 7 月 17 日的 post 中指出,websockets 曾经 return None 当连接关闭但在某些时候发生了变化。相反,他建议您使用 try 来处理异常。 OP 的代码显示了对 None 和 try/except 的检查。 None 检查不必要地冗长而且显然不准确,因为在当前版本中,websocket.recv() 在客户端关闭时不会 return 任何东西。

解决 "main" 问题,它看起来像是某种竞争条件。请记住,asyncio 是通过四处走动并触摸所有等待的元素来推动它们前进的。如果您的 'close connection' 命令在您的队列被清除之前的某个时间点被处理,客户端将永远不会收到队列中的最后一条消息。添加 async.sleep 会向循环法添加一个额外的步骤,并且可能会将您的队列清空任务置于 'close connection' 之前。

解决等待的数量,就是要实现目标需要发生多少异步事情。如果你在任何时候阻止,你将停止所有你想继续进行的其他任务。