如何基于 python-asyncio 的传输和协议 api 实现 sock.recv() 之类的逻辑?

How to implement logic like sock.recv(), based on python-asyncio's transport & protocol api?

我正在尝试基于 asyncio tcp 构建一些简单的应用程序。在传统的socket编程中,我们使用sock.recv()和sock.send()来管理socket的接收和发送,但我注意到根据asyncio文档不推荐直接使用sockets,相应地,他们建议使用传输抽象。

我想知道如何使用传输来重现类似于传统套接字编程的逻辑。例如我想实现以下逻辑:

async def main():
    loop = asyncio.get_running_loop()
    transport, protocal = await loop.create_connection(EchoClientProtocol(), '', 25000)
    await transport.write("hello")
    await transport.read(5) # Error
    ....

上面的代码不行,因为transport一开始没有提供read方法,read事件必须在相应的协议中实现。这使我无法清楚地区分不同的 tcp 包。正确的做法是什么?谢谢

您可以使用 asyncio streams

实现 TCP 服务器和客户端

编辑 基于@user4815162342 很好的建议:

我将 read chuck 上的最大字节数从 1 字节增加到 8192 字节,在示例中使用尽可能小的数字是我的坏主意,这可能会误导其他人。

此外,BytesIO+= 字节更适合串联。我向这个代码示例介绍了 BytesIO

服务器脚本示例:

import asyncio
import socket
from io import BytesIO


async def handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    print(len(asyncio.all_tasks()))  # let's show number of tasks
    ip, port = writer.get_extra_info('peername')  # get info about incoming connection
    print(f"Incoming connection from {ip}: {port}")
    # better use BytesIO than += if you gonna concat many times
    all_data = BytesIO()
    while True:
        try:
            # read chunk up to 8 kbytes
            data = await asyncio.wait_for(reader.read(8192), timeout=2.0)
            all_data.write(data)
            if reader.at_eof():
                print(f"Received data:\n{all_data.getvalue().decode('utf8')}")
                break
        except (asyncio.CancelledError, asyncio.TimeoutError):
            print("Too slow connection aborted")
            break

    writer.write(b"FROM_SERVER:\n")  # prepare data
    writer.write(all_data.getvalue())  # prepare more data
    # simulate slow server
    # await asyncio.sleep(5)
    await writer.drain()  # send all prepared data

    if writer.can_write_eof():
        writer.write_eof()

    writer.close()  # do not forget to close stream


async def main_server():
    server = await asyncio.start_server(
        client_connected_cb=handler,
        host="localhost",
        port=8888,
        family=socket.AF_INET,  # ipv4
    )

    ip, port = server.sockets[0].getsockname()
    print(f"Serving on: {ip}:{port}")
    print("*" * 200)

    async with server:
        await server.serve_forever()

if __name__ == '__main__':
    asyncio.run(main_server())

客户端脚本示例:

import asyncio
from io import BytesIO


async def main():
    reader, writer = await asyncio.open_connection(host="localhost", port=8888)
    # remove comment to test slow client
    # await asyncio.sleep(20)
    for i in range(10):
        writer.write(f"hello-{i}\n".encode("utf8"))  # prepare data
        await writer.drain()  # send data

    if writer.can_write_eof():
        writer.write_eof()  # tell server that we sent all data

    # better use BytesIO than += if you gonna concat many times
    data_from_server = BytesIO()  # now get server answer
    try:
        while True:
            # read chunk up to 8 kbytes
            data = await asyncio.wait_for(reader.read(8192), timeout=1.0)
            data_from_server.write(data)
            # if server told use that no more data
            if reader.at_eof():
                break

        print(data_from_server.getvalue().decode('utf8'))
        writer.close()
    except ConnectionAbortedError:
        # if our client was too slow
        print("Server timed out connection")
        writer.close()
    except (asyncio.TimeoutError, asyncio.CancelledError):
        # if server was too slow
        print("Did not get answer from server due to timeout")
        writer.close()

if __name__ == '__main__':
    asyncio.run(main())