Python Asyncio Network Client StreamWriter 不正常发送数据,只是在 EventLoop 关闭时

Python Asyncio Network Client StreamWriter does not normally send data, Just on EventLoop closing

我真的试着让这个post不大,但是做不到...

我正在尝试理解 python asyncio/streaming,为此我正在编写服务器和客户端代码。在服务器接收到的每个连接上,它都会创建两个新的协程,一个用于处理接收到的数据,另一个用于发送数据(无论是响应还是没有请求的状态消息)。处理数据必须是协程,因为它将访问和修改内部数据,但对于这些示例,它只会确认并将请求发送回客户端。 process_coroutine 准备一个响应并将其放入发送队列,以便 send_coroutine 可以发送它。代码如下所示,我目前有一个奇怪的行为。客户端只有在我Ctrl+C强行退出客户端程序后才会发送数据,嗯,至少我是这样总结的。在我的睾丸中,我 运行 有两个客户端,就像我在强制退出一个客户端后说的那样 gets/receives 数据并将其发回(现在只发送给另一个客户端,因为第一个客户端已关闭)。

服务器代码:

import asyncio
from asyncio import StreamReader, StreamWriter, Queue
from collections import deque, defaultdict
from contextlib import suppress
from typing import Deque, DefaultDict, Dict

# Dictionary storing messages to send per writer
SEND_QUEUES: DefaultDict[StreamWriter, Queue] = defaultdict(Queue)

# Dictionary storing request messages to process
RQST_QUEUES: Dict[StreamWriter, Queue] = defaultdict(Queue)

async def client(reader: StreamReader, writer: StreamWriter):
    peername = writer.get_extra_info('peername')
    print(f'Remote {peername} connected')

    send_task = asyncio.create_task(send_client(writer, SEND_QUEUES[writer]))
    process_task = asyncio.create_task(process_rqst(writer, RQST_QUEUES[writer]))
    
    # Send a test message
    await SEND_QUEUES[writer].put("Conenction Opened\r\n")

    try:
        while data := await reader.readline():
            print(f"DATA_RCVD: {data.decode()}")
            # Queue message for processing
            await RQST_QUEUES[writer].put(data.decode().rstrip())
            
    except asyncio.CancelledError:
        print(f'Remote {peername} connection cancelled.')
    except asyncio.IncompleteReadError:
        print(f'Remote {peername} disconnected')
    finally:
        print(f'Remote {peername} closed')
        await SEND_QUEUES[writer].put(None)
        await send_task
        del SEND_QUEUES[writer]

        await RQST_QUEUES[writer].put(None)
        await process_task
        del RQST_QUEUES[writer]

async def send_client(writer: StreamWriter, queue: Queue):
    while True:
        try:
            data = await queue.get()
        except asyncio.CancelledError:
            continue

        if not data:
            break

        try:
            writer.write(data.encode())
            await writer.drain()
        except asyncio.CancelledError:
            writer.write(data.encode())
            await writer.drain()

    writer.close()
    await writer.wait_closed()

async def process_rqst(writer: StreamWriter, queue: Queue):
    with suppress(asyncio.CancelledError):
        while True:
            print(f"PROCESS - awaiting RQST_MSG")
            if not (msg := await queue.get()):
                break
            
            # Instead of processing, just acknowledge it for now and send to ever connection
            for writer in SEND_QUEUES:
                    if not SEND_QUEUES[writer].full():
                        print(f"PROCESS - Add data to writer {writer.get_extra_info('peername')}\r\n")
                        await SEND_QUEUES[writer].put(f"ACK {msg}\r\n")

async def main(*args, **kwargs):
    server = await asyncio.start_server(*args, **kwargs)
    async with server:
        await server.serve_forever()

try:
    asyncio.run(main(client, host='', port=25000))
except KeyboardInterrupt:
    print('\r\nBye!')

客户端有两个协程,一个是从键盘接收数据并发送给服务器,另一个是从服务器接收数据。这是客户端的代码:

import argparse
import asyncio
import aioconsole
from asyncio import StreamReader, StreamWriter

async def msg_reader(reader: StreamReader):
    print(f"READER - msg_reader initialized")
    try:
        while data := await reader.readline():
            print(f"{data.decode()}\r\n> ")
        
        print(f"READER - Connection Ended")
    
    except asyncio.CancelledError as cerr:
        print(f"\r\nREADER ERR - {cerr}")
        print(f'READER - Remote connection cancelled.')
    except asyncio.IncompleteReadError:
        print(f'\r\nREADER - Remote disconnected')
    finally:
        print(f'READER - Remote closed')

async def msg_writer(writer: StreamWriter):
    print(f'WRITER - msg_writer {writer.get_extra_info("sockname")} initialized')
    try:
        while True:
            msg = await aioconsole.ainput("> ")

            writer.write(msg.encode())
            await writer.drain()
    
    except asyncio.CancelledError as cerr:
        print(f"\r\nWRITER ERR - {cerr}")
        print(f'WRITER - Remote connection cancelled.')
    finally:
        print(f'WRITER - Remote closed')
        writer.close()
        await writer.wait_closed()

async def main():
    parser = argparse.ArgumentParser(description = "This is the client for the multi threaded socket server!")
    parser.add_argument('--host', metavar = 'host', type = str, nargs = '?', default = "127.0.0.1")
    parser.add_argument('--port', metavar = 'port', type = int, nargs = '?', default = 25000)
    args = parser.parse_args()

    print(f"Connecting to server: {args.host} on port: {args.port}")

    reader, writer = await asyncio.open_connection(host=args.host, port=args.port)

    await asyncio.gather(msg_reader(reader), msg_writer(writer))

try:
    asyncio.run(main())
except KeyboardInterrupt:
    print('\r\nBye!')

现在我介绍操作流程。 服务器 运行宁:

python3 sof_asyncio_server.py
Remote ('127.0.0.1', 50261) connected
PROCESS - awaiting RQST_MSG
Remote ('127.0.0.1', 50263) connected
PROCESS - awaiting RQST_MSG
DATA_RCVD: teste 01teste o2
Remote ('127.0.0.1', 50263) closed
PROCESS - Add data to writer ('127.0.0.1', 50261)

PROCESS - Add data to writer ('127.0.0.1', 50263)

PROCESS - awaiting RQST_MSG
DATA_RCVD: teste 03
Remote ('127.0.0.1', 50261) closed
PROCESS - Adicionando dados ao writer ('127.0.0.1', 50261)

PROCESS - awaiting RQST_MSG

客户 1:

python3 sof_asyncio_client.py
Connecting to server: 127.0.0.1 on port: 25000
READER - msg_reader initialized
WRITER - msg_writer ('127.0.0.1', 50263) initialized
Conenction Opened

> 
> teste 01
> teste o2
> ^C
READER ERR - 
READER - Remote connection cancelled.
READER - Remote closed

WRITER ERR - 
WRITER - Remote connection cancelled.
WRITER - Remote closed

Bye!

客户 2:

python3 sof_asyncio_client.py
Connecting to server: 127.0.0.1 on port: 25000
READER - msg_reader initialized
WRITER - msg_writer ('127.0.0.1', 50261) initialized
Conenction Opened

> 
> teste 03
> ACK teste 01teste o2

> 
^C
READER ERR - 
READER - Remote connection cancelled.
READER - Remote closed

WRITER ERR - 
WRITER - Remote connection cancelled.
WRITER - Remote closed

Bye!

正如我所说,只有在我在第一个客户端上按下 Crtl + C 后,它才真正将数据发送到服务器,并且可以在服务器输出和客户端 02 输出的响应中看到。同样,只有在我在客户端 2 上按 Ctrl + C 后,它才发送数据并且可以在服务器输出上看到。

为什么客户端不发送数据?或者可能是服务器没有收到它?我认为是第一个选项,但无法弄清楚为什么...

客户端立即发送数据,但它发送的消息不包括服务器期望的换行符,因为它使用 readline() 来读取消息。这就是为什么服务器只观察 EOF 之后的消息,此时 readline() returns 累积数据,即使它没有以换行符结尾或包含换行符。

编写器中的循环需要如下所示:

while True:
    msg = await aioconsole.ainput("> ")
    writer.write(msg.encode())
    writer.write('\n')
    await writer.drain()

请注意,写入的字节除外,因此需要对'\n'进行编码。

while True:
    msg = await aioconsole.ainput("> ")
    writer.write(msg.encode())
    writer.write('\n'.encode())
    await writer.drain()