使用异步服务器正确发送消息 Python
Sending messages correctly with Async Server Python
我 运行 遇到了我设置的异步服务器的问题。我已经对该主题进行了大量研究,但似乎找不到任何有用的信息。基本上我的服务器正在监听一个端口,但是当我从“客户端”发送消息时,它似乎正在发送大量消息。
以前我一直在使用多线程套接字服务器,但经过一些研究,异步服务器似乎更适用于我的项目。
我不确定设置服务器的最佳方式,因此我们将不胜感激任何关于最佳实践的建设性反馈。
lia_server.py
import logging
import asyncio
from datetime import datetime
from os import path
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d_%H:%M:%S',
filename=f'{path.abspath("../log")}'
f'\srv_{datetime.strftime(datetime.today(), "%m_%d_%Y_%H_%M_%S")}',
filemode='w')
header = 1024
msg_format = 'utf-8'
disconnect_message = "BYE"
async def handle_client(reader, writer):
request = None
while request != disconnect_message:
request = (await reader.read()).decode(msg_format)
logging.info(f'[Message Received] {request}')
response = str(request)
writer.write(response.encode(msg_format))
await writer.drain()
writer.close()
class LiaServer:
def __init__(self, host, port):
self.server = host
self.port = port
logging.info(f'LiaServer Class object has been created with values of {self.server}:{self.port}')
async def run_server(self):
logging.info(f'[Server Start] Attempting to start the server now')
self.server_instance = await asyncio.start_server(handle_client, self.server, self.port)
async with self.server_instance:
logging.info(f'[Server Start] Server is now LISTENING on {self.server}:{self.port}')
await self.server_instance.serve_forever()
def stop_server(self):
logging.info(f'[Server Stop] Attempting to stop the server now')
pass
async def main():
srv_cls = LiaServer('localhost', '1338')
taskserver = asyncio.create_task(srv_cls.run_server())
await taskserver
if __name__ == "__main__":
asyncio.run(main())
client_test.py
import socket
import lia_server
import asyncio
msg_format = 'utf-8'
header = lia_server.header
disconnect_message = lia_server.disconnect_message
async def receive_cfg_and_send_msg(cls, msg: str):
address = (cls.server, cls.port)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(address)
await send(client, msg)
async def send(client, msg):
message = msg.encode(msg_format)
msg_length = len(message)
send_length = str(msg_length).encode(msg_format)
send_length += b' ' * (header - len(send_length))
client.send(message)
def main():
server = lia_server.LiaServer('localhost', 1338)
asyncio.run(receive_cfg_and_send_msg(server, "Hello World"))
asyncio.run(receive_cfg_and_send_msg(server, disconnect_message))
print("Disconnected")
if __name__ == "__main__":
main()
然后每当 运行 服务器,然后是客户端,就会发生这种情况
日志
2021-07-14_00:10:21 root INFO LiaServer Class object has been created with values of localhost:1338
2021-07-14_00:10:21 root INFO [Server Start] Attempting to start the server now
2021-07-14_00:10:21 root INFO [Server Start] Server is now LISTENING on localhost:1338
2021-07-14_00:10:33 root INFO [Message Received] Hello World
2021-07-14_00:10:33 root INFO [Message Received]
2021-07-14_00:10:33 root INFO [Message Received]
2021-07-14_00:10:33 root INFO [Message Received]
2021-07-14_00:10:33 root INFO [Message Received]
The [Message Received] Lines keep repeating for a very long time.
服务器的行为让我觉得它一直在循环,但我不确定具体原因。
我希望让服务器接收消息,然后解析消息并检查它是否是断开连接消息。之后服务器向客户端发送响应,说回断开连接消息。
提前致谢!
您的代码存在多个问题。导致您看到的直接问题的原因是您正在使用 await reader.read()
从客户端读取“消息”。 TCP 是一种流协议,因此它没有消息的概念,只有按客户端发送它们的顺序到达的字节流的概念。 read()
方法将读取 所有 数据直到 EOF。这几乎肯定不是您想要的。由于您定义了“header”,您可能应该调用 reader.readexactly(header)
。然后您应该从消息中去除尾随空格,然后您才应该尝试将它与已知字符串进行匹配。
下一个问题是条件while request != disconnect_message
。如果遇到 end-of-file 条件,StreamReader.read
将 return 空字节 object、b''
。这将不等于断开连接消息,因此您的循环将继续循环,每个后续 read()
再次 returning b''
再次指示 EOF。这会导致您看到的无限循环,您可以通过检查 disconnect_message
或 b''
.
来修复它
您的客户端也有一个问题:它创建了两个完全独立的服务器连接 - 请注意 receive_cfg_and_send_msg
如何打开一个新连接。第一个连接在函数退出时被垃圾收集器关闭,这导致服务器上的无限循环。第二个连接试图到达服务器,但是服务器现在完全陷入了第一个连接的无限循环。 (参见 this answer 的解释 为什么 尽管显然在等待,但它仍然卡住了。)
最后,客户端包含对 asyncio.run()
的多次调用。虽然您可以这样做,但这不是应该使用 asyncio.run
的方式 - 您可能应该有一个 async def main()
函数并通过 asyncio.run(main())
调用它。在该函数中,您应该 await
需要 运行 的异步函数(或使用 asyncio.gather()
等组合它们)
我 运行 遇到了我设置的异步服务器的问题。我已经对该主题进行了大量研究,但似乎找不到任何有用的信息。基本上我的服务器正在监听一个端口,但是当我从“客户端”发送消息时,它似乎正在发送大量消息。
以前我一直在使用多线程套接字服务器,但经过一些研究,异步服务器似乎更适用于我的项目。
我不确定设置服务器的最佳方式,因此我们将不胜感激任何关于最佳实践的建设性反馈。
lia_server.py
import logging
import asyncio
from datetime import datetime
from os import path
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d_%H:%M:%S',
filename=f'{path.abspath("../log")}'
f'\srv_{datetime.strftime(datetime.today(), "%m_%d_%Y_%H_%M_%S")}',
filemode='w')
header = 1024
msg_format = 'utf-8'
disconnect_message = "BYE"
async def handle_client(reader, writer):
request = None
while request != disconnect_message:
request = (await reader.read()).decode(msg_format)
logging.info(f'[Message Received] {request}')
response = str(request)
writer.write(response.encode(msg_format))
await writer.drain()
writer.close()
class LiaServer:
def __init__(self, host, port):
self.server = host
self.port = port
logging.info(f'LiaServer Class object has been created with values of {self.server}:{self.port}')
async def run_server(self):
logging.info(f'[Server Start] Attempting to start the server now')
self.server_instance = await asyncio.start_server(handle_client, self.server, self.port)
async with self.server_instance:
logging.info(f'[Server Start] Server is now LISTENING on {self.server}:{self.port}')
await self.server_instance.serve_forever()
def stop_server(self):
logging.info(f'[Server Stop] Attempting to stop the server now')
pass
async def main():
srv_cls = LiaServer('localhost', '1338')
taskserver = asyncio.create_task(srv_cls.run_server())
await taskserver
if __name__ == "__main__":
asyncio.run(main())
client_test.py
import socket
import lia_server
import asyncio
msg_format = 'utf-8'
header = lia_server.header
disconnect_message = lia_server.disconnect_message
async def receive_cfg_and_send_msg(cls, msg: str):
address = (cls.server, cls.port)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(address)
await send(client, msg)
async def send(client, msg):
message = msg.encode(msg_format)
msg_length = len(message)
send_length = str(msg_length).encode(msg_format)
send_length += b' ' * (header - len(send_length))
client.send(message)
def main():
server = lia_server.LiaServer('localhost', 1338)
asyncio.run(receive_cfg_and_send_msg(server, "Hello World"))
asyncio.run(receive_cfg_and_send_msg(server, disconnect_message))
print("Disconnected")
if __name__ == "__main__":
main()
然后每当 运行 服务器,然后是客户端,就会发生这种情况
日志
2021-07-14_00:10:21 root INFO LiaServer Class object has been created with values of localhost:1338
2021-07-14_00:10:21 root INFO [Server Start] Attempting to start the server now
2021-07-14_00:10:21 root INFO [Server Start] Server is now LISTENING on localhost:1338
2021-07-14_00:10:33 root INFO [Message Received] Hello World
2021-07-14_00:10:33 root INFO [Message Received]
2021-07-14_00:10:33 root INFO [Message Received]
2021-07-14_00:10:33 root INFO [Message Received]
2021-07-14_00:10:33 root INFO [Message Received]
The [Message Received] Lines keep repeating for a very long time.
服务器的行为让我觉得它一直在循环,但我不确定具体原因。
我希望让服务器接收消息,然后解析消息并检查它是否是断开连接消息。之后服务器向客户端发送响应,说回断开连接消息。
提前致谢!
您的代码存在多个问题。导致您看到的直接问题的原因是您正在使用 await reader.read()
从客户端读取“消息”。 TCP 是一种流协议,因此它没有消息的概念,只有按客户端发送它们的顺序到达的字节流的概念。 read()
方法将读取 所有 数据直到 EOF。这几乎肯定不是您想要的。由于您定义了“header”,您可能应该调用 reader.readexactly(header)
。然后您应该从消息中去除尾随空格,然后您才应该尝试将它与已知字符串进行匹配。
下一个问题是条件while request != disconnect_message
。如果遇到 end-of-file 条件,StreamReader.read
将 return 空字节 object、b''
。这将不等于断开连接消息,因此您的循环将继续循环,每个后续 read()
再次 returning b''
再次指示 EOF。这会导致您看到的无限循环,您可以通过检查 disconnect_message
或 b''
.
您的客户端也有一个问题:它创建了两个完全独立的服务器连接 - 请注意 receive_cfg_and_send_msg
如何打开一个新连接。第一个连接在函数退出时被垃圾收集器关闭,这导致服务器上的无限循环。第二个连接试图到达服务器,但是服务器现在完全陷入了第一个连接的无限循环。 (参见 this answer 的解释 为什么 尽管显然在等待,但它仍然卡住了。)
最后,客户端包含对 asyncio.run()
的多次调用。虽然您可以这样做,但这不是应该使用 asyncio.run
的方式 - 您可能应该有一个 async def main()
函数并通过 asyncio.run(main())
调用它。在该函数中,您应该 await
需要 运行 的异步函数(或使用 asyncio.gather()
等组合它们)