Python 3 websockets - 在关闭连接之前发送消息
Python 3 websockets - send message before closing connection
我是 Stack Overflow 的新手(虽然已经是长期 "stalker"!)所以请对我温柔点!
我正在尝试学习 Python,尤其是使用 websockets 的 Asyncio。
在网上搜索 examples/tutorials 我整理了以下微型聊天应用程序,可以在它变得更庞大(更多命令等)和变得难以重构之前使用一些建议。
我的主要问题是,为什么(在发送 DISCONNECT 命令时)它需要 asyncio.sleep(0) 才能在关闭连接之前发送断开连接验证消息?
除此之外,我的结构是否正确?
我觉得太多了 async/await 但我不太明白为什么。
盯着教程和 S/O 几个小时的帖子在这一点上似乎没有帮助,所以我想我应该直接得到一些专家的建议!
我们开始吧,响应 "nick"、"msg"、"test" 和 "disconnect" 命令的简单 WS 服务器。不需要前缀,即 "nick Rachel".
import asyncio
import websockets
import sys
class ChatServer:
def __init__(self):
print("Chat Server Starting..")
self.Clients = set()
if sys.platform == 'win32':
self.loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(self.loop)
else:
self.loop = asyncio.get_event_loop()
def run(self):
start_server = websockets.serve(self.listen, '0.0.0.0', 8080)
try:
self.loop.run_until_complete(start_server)
print("Chat Server Running!")
self.loop.run_forever()
except:
print("Chat Server Error!")
async def listen(self, websocket, path):
client = Client(websocket=websocket)
sender_task = asyncio.ensure_future(self.handle_outgoing_queue(client))
self.Clients.add(client)
print("+ connection: " + str(len(self.Clients)))
while True:
try:
msg = await websocket.recv()
if msg is None:
break
await self.handle_message(client, msg)
except websockets.exceptions.ConnectionClosed:
break
self.Clients.remove(client)
print("- connection: " + str(len(self.Clients)))
async def handle_outgoing_queue(self, client):
while client.websocket.open:
msg = await client.outbox.get()
await client.websocket.send(msg)
async def handle_message(self, client, data):
strdata = data.split(" ")
_cmd = strdata[0].lower()
try:
# Check to see if the command exists. Otherwise, AttributeError is thrown.
func = getattr(self, "cmd_" + _cmd)
try:
await func(client, param, strdata)
except IndexError:
await client.send("Not enough parameters!")
except AttributeError:
await client.send("Command '%s' does not exist!" % (_cmd))
# SERVER COMMANDS
async def cmd_nick(self, client, param, strdata):
# This command needs a parameter (with at least one character). If not supplied, IndexError is raised
# Is there a cleaner way of doing this? Otherwise it'll need to reside within all functions that require a param
test = param[1][0]
# If we've reached this point there's definitely a parameter supplied
client.Nick = param[1]
await client.send("Your nickname is now %s" % (client.Nick))
async def cmd_msg(self, client, param, strdata):
# This command needs a parameter (with at least one character). If not supplied, IndexError is raised
# Is there a cleaner way of doing this? Otherwise it'll need to reside within all functions that require a param
test = param[1][0]
# If we've reached this point there's definitely a parameter supplied
message = strdata.split(" ",1)[1]
# Before we proceed, do we have a nickname?
if client.Nick == None:
await client.send("You must choose a nickname before sending messages!")
return
for each in self.Clients:
await each.send("%s says: %s" % (client.Nick, message))
async def cmd_test(self, client, param, strdata):
# This command doesn't need a parameter, so simply let the client know they issued this command successfully.
await client.send("Test command reply!")
async def cmd_disconnect(self, client, param, strdata):
# This command doesn't need a parameter, so simply let the client know they issued this command successfully.
await client.send("DISCONNECTING")
await asyncio.sleep(0) # If this isn't here we don't receive the "disconnecting" message - just an exception in "handle_outgoing_queue" ?
await client.websocket.close()
class Client():
def __init__(self, websocket=None):
self.websocket = websocket
self.IPAddress = websocket.remote_address[0]
self.Port = websocket.remote_address[1]
self.Nick = None
self.outbox = asyncio.Queue()
async def send(self, data):
await self.outbox.put(data)
chat = ChatServer()
chat.run()
您的代码使用无限大小 Queues
,这意味着 .put()
立即调用 .put_nowait()
和 returns。 (如果您确实想在代码中保留这些队列,请考虑在队列中使用 'None' 作为关闭连接并将 client.websocket.close()
移动到 handle_outgoing_queue()
的信号)。
另一个问题:考虑将 for x in seq: await co(x)
替换为 await asyncio.wait([co(x) for x in seq])
。尝试使用 asyncio.sleep(1)
来体验显着的差异。
我相信一个更好的选择是删除所有发件箱 Queue
s 并仅中继内置的异步队列和 ensure_future
。 websockets 包已经在其实现中包含队列。
我想指出的是,websockets 的作者在 2017 年 7 月 17 日的 post 中指出,websockets 曾经 return None 当连接关闭但在某些时候发生了变化。相反,他建议您使用 try 来处理异常。 OP 的代码显示了对 None 和 try/except 的检查。 None 检查不必要地冗长而且显然不准确,因为在当前版本中,websocket.recv() 在客户端关闭时不会 return 任何东西。
解决 "main" 问题,它看起来像是某种竞争条件。请记住,asyncio 是通过四处走动并触摸所有等待的元素来推动它们前进的。如果您的 'close connection' 命令在您的队列被清除之前的某个时间点被处理,客户端将永远不会收到队列中的最后一条消息。添加 async.sleep 会向循环法添加一个额外的步骤,并且可能会将您的队列清空任务置于 'close connection' 之前。
解决等待的数量,就是要实现目标需要发生多少异步事情。如果你在任何时候阻止,你将停止所有你想继续进行的其他任务。
我是 Stack Overflow 的新手(虽然已经是长期 "stalker"!)所以请对我温柔点!
我正在尝试学习 Python,尤其是使用 websockets 的 Asyncio。
在网上搜索 examples/tutorials 我整理了以下微型聊天应用程序,可以在它变得更庞大(更多命令等)和变得难以重构之前使用一些建议。
我的主要问题是,为什么(在发送 DISCONNECT 命令时)它需要 asyncio.sleep(0) 才能在关闭连接之前发送断开连接验证消息?
除此之外,我的结构是否正确?
我觉得太多了 async/await 但我不太明白为什么。
盯着教程和 S/O 几个小时的帖子在这一点上似乎没有帮助,所以我想我应该直接得到一些专家的建议!
我们开始吧,响应 "nick"、"msg"、"test" 和 "disconnect" 命令的简单 WS 服务器。不需要前缀,即 "nick Rachel".
import asyncio
import websockets
import sys
class ChatServer:
def __init__(self):
print("Chat Server Starting..")
self.Clients = set()
if sys.platform == 'win32':
self.loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(self.loop)
else:
self.loop = asyncio.get_event_loop()
def run(self):
start_server = websockets.serve(self.listen, '0.0.0.0', 8080)
try:
self.loop.run_until_complete(start_server)
print("Chat Server Running!")
self.loop.run_forever()
except:
print("Chat Server Error!")
async def listen(self, websocket, path):
client = Client(websocket=websocket)
sender_task = asyncio.ensure_future(self.handle_outgoing_queue(client))
self.Clients.add(client)
print("+ connection: " + str(len(self.Clients)))
while True:
try:
msg = await websocket.recv()
if msg is None:
break
await self.handle_message(client, msg)
except websockets.exceptions.ConnectionClosed:
break
self.Clients.remove(client)
print("- connection: " + str(len(self.Clients)))
async def handle_outgoing_queue(self, client):
while client.websocket.open:
msg = await client.outbox.get()
await client.websocket.send(msg)
async def handle_message(self, client, data):
strdata = data.split(" ")
_cmd = strdata[0].lower()
try:
# Check to see if the command exists. Otherwise, AttributeError is thrown.
func = getattr(self, "cmd_" + _cmd)
try:
await func(client, param, strdata)
except IndexError:
await client.send("Not enough parameters!")
except AttributeError:
await client.send("Command '%s' does not exist!" % (_cmd))
# SERVER COMMANDS
async def cmd_nick(self, client, param, strdata):
# This command needs a parameter (with at least one character). If not supplied, IndexError is raised
# Is there a cleaner way of doing this? Otherwise it'll need to reside within all functions that require a param
test = param[1][0]
# If we've reached this point there's definitely a parameter supplied
client.Nick = param[1]
await client.send("Your nickname is now %s" % (client.Nick))
async def cmd_msg(self, client, param, strdata):
# This command needs a parameter (with at least one character). If not supplied, IndexError is raised
# Is there a cleaner way of doing this? Otherwise it'll need to reside within all functions that require a param
test = param[1][0]
# If we've reached this point there's definitely a parameter supplied
message = strdata.split(" ",1)[1]
# Before we proceed, do we have a nickname?
if client.Nick == None:
await client.send("You must choose a nickname before sending messages!")
return
for each in self.Clients:
await each.send("%s says: %s" % (client.Nick, message))
async def cmd_test(self, client, param, strdata):
# This command doesn't need a parameter, so simply let the client know they issued this command successfully.
await client.send("Test command reply!")
async def cmd_disconnect(self, client, param, strdata):
# This command doesn't need a parameter, so simply let the client know they issued this command successfully.
await client.send("DISCONNECTING")
await asyncio.sleep(0) # If this isn't here we don't receive the "disconnecting" message - just an exception in "handle_outgoing_queue" ?
await client.websocket.close()
class Client():
def __init__(self, websocket=None):
self.websocket = websocket
self.IPAddress = websocket.remote_address[0]
self.Port = websocket.remote_address[1]
self.Nick = None
self.outbox = asyncio.Queue()
async def send(self, data):
await self.outbox.put(data)
chat = ChatServer()
chat.run()
您的代码使用无限大小 Queues
,这意味着 .put()
立即调用 .put_nowait()
和 returns。 (如果您确实想在代码中保留这些队列,请考虑在队列中使用 'None' 作为关闭连接并将 client.websocket.close()
移动到 handle_outgoing_queue()
的信号)。
另一个问题:考虑将 for x in seq: await co(x)
替换为 await asyncio.wait([co(x) for x in seq])
。尝试使用 asyncio.sleep(1)
来体验显着的差异。
我相信一个更好的选择是删除所有发件箱 Queue
s 并仅中继内置的异步队列和 ensure_future
。 websockets 包已经在其实现中包含队列。
我想指出的是,websockets 的作者在 2017 年 7 月 17 日的 post 中指出,websockets 曾经 return None 当连接关闭但在某些时候发生了变化。相反,他建议您使用 try 来处理异常。 OP 的代码显示了对 None 和 try/except 的检查。 None 检查不必要地冗长而且显然不准确,因为在当前版本中,websocket.recv() 在客户端关闭时不会 return 任何东西。
解决 "main" 问题,它看起来像是某种竞争条件。请记住,asyncio 是通过四处走动并触摸所有等待的元素来推动它们前进的。如果您的 'close connection' 命令在您的队列被清除之前的某个时间点被处理,客户端将永远不会收到队列中的最后一条消息。添加 async.sleep 会向循环法添加一个额外的步骤,并且可能会将您的队列清空任务置于 'close connection' 之前。
解决等待的数量,就是要实现目标需要发生多少异步事情。如果你在任何时候阻止,你将停止所有你想继续进行的其他任务。