如何基于 python-asyncio 的传输和协议 api 实现 sock.recv() 之类的逻辑?
How to implement logic like sock.recv(), based on python-asyncio's transport & protocol api?
我正在尝试基于 asyncio tcp 构建一些简单的应用程序。在传统的socket编程中,我们使用sock.recv()和sock.send()来管理socket的接收和发送,但我注意到根据asyncio文档不推荐直接使用sockets,相应地,他们建议使用传输抽象。
我想知道如何使用传输来重现类似于传统套接字编程的逻辑。例如我想实现以下逻辑:
async def main():
loop = asyncio.get_running_loop()
transport, protocal = await loop.create_connection(EchoClientProtocol(), '', 25000)
await transport.write("hello")
await transport.read(5) # Error
....
上面的代码不行,因为transport一开始没有提供read方法,read事件必须在相应的协议中实现。这使我无法清楚地区分不同的 tcp 包。正确的做法是什么?谢谢
您可以使用 asyncio streams
实现 TCP 服务器和客户端
编辑 基于@user4815162342 很好的建议:
我将 read
chuck 上的最大字节数从 1 字节增加到 8192 字节,在示例中使用尽可能小的数字是我的坏主意,这可能会误导其他人。
此外,BytesIO
比 +=
字节更适合串联。我向这个代码示例介绍了 BytesIO
。
服务器脚本示例:
import asyncio
import socket
from io import BytesIO
async def handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
print(len(asyncio.all_tasks())) # let's show number of tasks
ip, port = writer.get_extra_info('peername') # get info about incoming connection
print(f"Incoming connection from {ip}: {port}")
# better use BytesIO than += if you gonna concat many times
all_data = BytesIO()
while True:
try:
# read chunk up to 8 kbytes
data = await asyncio.wait_for(reader.read(8192), timeout=2.0)
all_data.write(data)
if reader.at_eof():
print(f"Received data:\n{all_data.getvalue().decode('utf8')}")
break
except (asyncio.CancelledError, asyncio.TimeoutError):
print("Too slow connection aborted")
break
writer.write(b"FROM_SERVER:\n") # prepare data
writer.write(all_data.getvalue()) # prepare more data
# simulate slow server
# await asyncio.sleep(5)
await writer.drain() # send all prepared data
if writer.can_write_eof():
writer.write_eof()
writer.close() # do not forget to close stream
async def main_server():
server = await asyncio.start_server(
client_connected_cb=handler,
host="localhost",
port=8888,
family=socket.AF_INET, # ipv4
)
ip, port = server.sockets[0].getsockname()
print(f"Serving on: {ip}:{port}")
print("*" * 200)
async with server:
await server.serve_forever()
if __name__ == '__main__':
asyncio.run(main_server())
客户端脚本示例:
import asyncio
from io import BytesIO
async def main():
reader, writer = await asyncio.open_connection(host="localhost", port=8888)
# remove comment to test slow client
# await asyncio.sleep(20)
for i in range(10):
writer.write(f"hello-{i}\n".encode("utf8")) # prepare data
await writer.drain() # send data
if writer.can_write_eof():
writer.write_eof() # tell server that we sent all data
# better use BytesIO than += if you gonna concat many times
data_from_server = BytesIO() # now get server answer
try:
while True:
# read chunk up to 8 kbytes
data = await asyncio.wait_for(reader.read(8192), timeout=1.0)
data_from_server.write(data)
# if server told use that no more data
if reader.at_eof():
break
print(data_from_server.getvalue().decode('utf8'))
writer.close()
except ConnectionAbortedError:
# if our client was too slow
print("Server timed out connection")
writer.close()
except (asyncio.TimeoutError, asyncio.CancelledError):
# if server was too slow
print("Did not get answer from server due to timeout")
writer.close()
if __name__ == '__main__':
asyncio.run(main())
我正在尝试基于 asyncio tcp 构建一些简单的应用程序。在传统的socket编程中,我们使用sock.recv()和sock.send()来管理socket的接收和发送,但我注意到根据asyncio文档不推荐直接使用sockets,相应地,他们建议使用传输抽象。
我想知道如何使用传输来重现类似于传统套接字编程的逻辑。例如我想实现以下逻辑:
async def main():
loop = asyncio.get_running_loop()
transport, protocal = await loop.create_connection(EchoClientProtocol(), '', 25000)
await transport.write("hello")
await transport.read(5) # Error
....
上面的代码不行,因为transport一开始没有提供read方法,read事件必须在相应的协议中实现。这使我无法清楚地区分不同的 tcp 包。正确的做法是什么?谢谢
您可以使用 asyncio streams
编辑 基于@user4815162342 很好的建议:
我将 read
chuck 上的最大字节数从 1 字节增加到 8192 字节,在示例中使用尽可能小的数字是我的坏主意,这可能会误导其他人。
此外,BytesIO
比 +=
字节更适合串联。我向这个代码示例介绍了 BytesIO
。
服务器脚本示例:
import asyncio
import socket
from io import BytesIO
async def handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
print(len(asyncio.all_tasks())) # let's show number of tasks
ip, port = writer.get_extra_info('peername') # get info about incoming connection
print(f"Incoming connection from {ip}: {port}")
# better use BytesIO than += if you gonna concat many times
all_data = BytesIO()
while True:
try:
# read chunk up to 8 kbytes
data = await asyncio.wait_for(reader.read(8192), timeout=2.0)
all_data.write(data)
if reader.at_eof():
print(f"Received data:\n{all_data.getvalue().decode('utf8')}")
break
except (asyncio.CancelledError, asyncio.TimeoutError):
print("Too slow connection aborted")
break
writer.write(b"FROM_SERVER:\n") # prepare data
writer.write(all_data.getvalue()) # prepare more data
# simulate slow server
# await asyncio.sleep(5)
await writer.drain() # send all prepared data
if writer.can_write_eof():
writer.write_eof()
writer.close() # do not forget to close stream
async def main_server():
server = await asyncio.start_server(
client_connected_cb=handler,
host="localhost",
port=8888,
family=socket.AF_INET, # ipv4
)
ip, port = server.sockets[0].getsockname()
print(f"Serving on: {ip}:{port}")
print("*" * 200)
async with server:
await server.serve_forever()
if __name__ == '__main__':
asyncio.run(main_server())
客户端脚本示例:
import asyncio
from io import BytesIO
async def main():
reader, writer = await asyncio.open_connection(host="localhost", port=8888)
# remove comment to test slow client
# await asyncio.sleep(20)
for i in range(10):
writer.write(f"hello-{i}\n".encode("utf8")) # prepare data
await writer.drain() # send data
if writer.can_write_eof():
writer.write_eof() # tell server that we sent all data
# better use BytesIO than += if you gonna concat many times
data_from_server = BytesIO() # now get server answer
try:
while True:
# read chunk up to 8 kbytes
data = await asyncio.wait_for(reader.read(8192), timeout=1.0)
data_from_server.write(data)
# if server told use that no more data
if reader.at_eof():
break
print(data_from_server.getvalue().decode('utf8'))
writer.close()
except ConnectionAbortedError:
# if our client was too slow
print("Server timed out connection")
writer.close()
except (asyncio.TimeoutError, asyncio.CancelledError):
# if server was too slow
print("Did not get answer from server due to timeout")
writer.close()
if __name__ == '__main__':
asyncio.run(main())