Python Asyncio Network Client StreamWriter 不正常发送数据,只是在 EventLoop 关闭时
Python Asyncio Network Client StreamWriter does not normally send data, Just on EventLoop closing
我真的试着让这个post不大,但是做不到...
我正在尝试理解 python asyncio/streaming,为此我正在编写服务器和客户端代码。在服务器接收到的每个连接上,它都会创建两个新的协程,一个用于处理接收到的数据,另一个用于发送数据(无论是响应还是没有请求的状态消息)。处理数据必须是协程,因为它将访问和修改内部数据,但对于这些示例,它只会确认并将请求发送回客户端。 process_coroutine 准备一个响应并将其放入发送队列,以便 send_coroutine 可以发送它。代码如下所示,我目前有一个奇怪的行为。客户端只有在我Ctrl+C强行退出客户端程序后才会发送数据,嗯,至少我是这样总结的。在我的睾丸中,我 运行 有两个客户端,就像我在强制退出一个客户端后说的那样 gets/receives 数据并将其发回(现在只发送给另一个客户端,因为第一个客户端已关闭)。
服务器代码:
import asyncio
from asyncio import StreamReader, StreamWriter, Queue
from collections import deque, defaultdict
from contextlib import suppress
from typing import Deque, DefaultDict, Dict
# Dictionary storing messages to send per writer
SEND_QUEUES: DefaultDict[StreamWriter, Queue] = defaultdict(Queue)
# Dictionary storing request messages to process
RQST_QUEUES: Dict[StreamWriter, Queue] = defaultdict(Queue)
async def client(reader: StreamReader, writer: StreamWriter):
peername = writer.get_extra_info('peername')
print(f'Remote {peername} connected')
send_task = asyncio.create_task(send_client(writer, SEND_QUEUES[writer]))
process_task = asyncio.create_task(process_rqst(writer, RQST_QUEUES[writer]))
# Send a test message
await SEND_QUEUES[writer].put("Conenction Opened\r\n")
try:
while data := await reader.readline():
print(f"DATA_RCVD: {data.decode()}")
# Queue message for processing
await RQST_QUEUES[writer].put(data.decode().rstrip())
except asyncio.CancelledError:
print(f'Remote {peername} connection cancelled.')
except asyncio.IncompleteReadError:
print(f'Remote {peername} disconnected')
finally:
print(f'Remote {peername} closed')
await SEND_QUEUES[writer].put(None)
await send_task
del SEND_QUEUES[writer]
await RQST_QUEUES[writer].put(None)
await process_task
del RQST_QUEUES[writer]
async def send_client(writer: StreamWriter, queue: Queue):
while True:
try:
data = await queue.get()
except asyncio.CancelledError:
continue
if not data:
break
try:
writer.write(data.encode())
await writer.drain()
except asyncio.CancelledError:
writer.write(data.encode())
await writer.drain()
writer.close()
await writer.wait_closed()
async def process_rqst(writer: StreamWriter, queue: Queue):
with suppress(asyncio.CancelledError):
while True:
print(f"PROCESS - awaiting RQST_MSG")
if not (msg := await queue.get()):
break
# Instead of processing, just acknowledge it for now and send to ever connection
for writer in SEND_QUEUES:
if not SEND_QUEUES[writer].full():
print(f"PROCESS - Add data to writer {writer.get_extra_info('peername')}\r\n")
await SEND_QUEUES[writer].put(f"ACK {msg}\r\n")
async def main(*args, **kwargs):
server = await asyncio.start_server(*args, **kwargs)
async with server:
await server.serve_forever()
try:
asyncio.run(main(client, host='', port=25000))
except KeyboardInterrupt:
print('\r\nBye!')
客户端有两个协程,一个是从键盘接收数据并发送给服务器,另一个是从服务器接收数据。这是客户端的代码:
import argparse
import asyncio
import aioconsole
from asyncio import StreamReader, StreamWriter
async def msg_reader(reader: StreamReader):
print(f"READER - msg_reader initialized")
try:
while data := await reader.readline():
print(f"{data.decode()}\r\n> ")
print(f"READER - Connection Ended")
except asyncio.CancelledError as cerr:
print(f"\r\nREADER ERR - {cerr}")
print(f'READER - Remote connection cancelled.')
except asyncio.IncompleteReadError:
print(f'\r\nREADER - Remote disconnected')
finally:
print(f'READER - Remote closed')
async def msg_writer(writer: StreamWriter):
print(f'WRITER - msg_writer {writer.get_extra_info("sockname")} initialized')
try:
while True:
msg = await aioconsole.ainput("> ")
writer.write(msg.encode())
await writer.drain()
except asyncio.CancelledError as cerr:
print(f"\r\nWRITER ERR - {cerr}")
print(f'WRITER - Remote connection cancelled.')
finally:
print(f'WRITER - Remote closed')
writer.close()
await writer.wait_closed()
async def main():
parser = argparse.ArgumentParser(description = "This is the client for the multi threaded socket server!")
parser.add_argument('--host', metavar = 'host', type = str, nargs = '?', default = "127.0.0.1")
parser.add_argument('--port', metavar = 'port', type = int, nargs = '?', default = 25000)
args = parser.parse_args()
print(f"Connecting to server: {args.host} on port: {args.port}")
reader, writer = await asyncio.open_connection(host=args.host, port=args.port)
await asyncio.gather(msg_reader(reader), msg_writer(writer))
try:
asyncio.run(main())
except KeyboardInterrupt:
print('\r\nBye!')
现在我介绍操作流程。
服务器 运行宁:
python3 sof_asyncio_server.py
Remote ('127.0.0.1', 50261) connected
PROCESS - awaiting RQST_MSG
Remote ('127.0.0.1', 50263) connected
PROCESS - awaiting RQST_MSG
DATA_RCVD: teste 01teste o2
Remote ('127.0.0.1', 50263) closed
PROCESS - Add data to writer ('127.0.0.1', 50261)
PROCESS - Add data to writer ('127.0.0.1', 50263)
PROCESS - awaiting RQST_MSG
DATA_RCVD: teste 03
Remote ('127.0.0.1', 50261) closed
PROCESS - Adicionando dados ao writer ('127.0.0.1', 50261)
PROCESS - awaiting RQST_MSG
客户 1:
python3 sof_asyncio_client.py
Connecting to server: 127.0.0.1 on port: 25000
READER - msg_reader initialized
WRITER - msg_writer ('127.0.0.1', 50263) initialized
Conenction Opened
>
> teste 01
> teste o2
> ^C
READER ERR -
READER - Remote connection cancelled.
READER - Remote closed
WRITER ERR -
WRITER - Remote connection cancelled.
WRITER - Remote closed
Bye!
客户 2:
python3 sof_asyncio_client.py
Connecting to server: 127.0.0.1 on port: 25000
READER - msg_reader initialized
WRITER - msg_writer ('127.0.0.1', 50261) initialized
Conenction Opened
>
> teste 03
> ACK teste 01teste o2
>
^C
READER ERR -
READER - Remote connection cancelled.
READER - Remote closed
WRITER ERR -
WRITER - Remote connection cancelled.
WRITER - Remote closed
Bye!
正如我所说,只有在我在第一个客户端上按下 Crtl + C 后,它才真正将数据发送到服务器,并且可以在服务器输出和客户端 02 输出的响应中看到。同样,只有在我在客户端 2 上按 Ctrl + C 后,它才发送数据并且可以在服务器输出上看到。
为什么客户端不发送数据?或者可能是服务器没有收到它?我认为是第一个选项,但无法弄清楚为什么...
客户端立即发送数据,但它发送的消息不包括服务器期望的换行符,因为它使用 readline()
来读取消息。这就是为什么服务器只观察 EOF 之后的消息,此时 readline()
returns 累积数据,即使它没有以换行符结尾或包含换行符。
编写器中的循环需要如下所示:
while True:
msg = await aioconsole.ainput("> ")
writer.write(msg.encode())
writer.write('\n')
await writer.drain()
请注意,写入的字节除外,因此需要对'\n'进行编码。
while True:
msg = await aioconsole.ainput("> ")
writer.write(msg.encode())
writer.write('\n'.encode())
await writer.drain()
我真的试着让这个post不大,但是做不到...
我正在尝试理解 python asyncio/streaming,为此我正在编写服务器和客户端代码。在服务器接收到的每个连接上,它都会创建两个新的协程,一个用于处理接收到的数据,另一个用于发送数据(无论是响应还是没有请求的状态消息)。处理数据必须是协程,因为它将访问和修改内部数据,但对于这些示例,它只会确认并将请求发送回客户端。 process_coroutine 准备一个响应并将其放入发送队列,以便 send_coroutine 可以发送它。代码如下所示,我目前有一个奇怪的行为。客户端只有在我Ctrl+C强行退出客户端程序后才会发送数据,嗯,至少我是这样总结的。在我的睾丸中,我 运行 有两个客户端,就像我在强制退出一个客户端后说的那样 gets/receives 数据并将其发回(现在只发送给另一个客户端,因为第一个客户端已关闭)。
服务器代码:
import asyncio
from asyncio import StreamReader, StreamWriter, Queue
from collections import deque, defaultdict
from contextlib import suppress
from typing import Deque, DefaultDict, Dict
# Dictionary storing messages to send per writer
SEND_QUEUES: DefaultDict[StreamWriter, Queue] = defaultdict(Queue)
# Dictionary storing request messages to process
RQST_QUEUES: Dict[StreamWriter, Queue] = defaultdict(Queue)
async def client(reader: StreamReader, writer: StreamWriter):
peername = writer.get_extra_info('peername')
print(f'Remote {peername} connected')
send_task = asyncio.create_task(send_client(writer, SEND_QUEUES[writer]))
process_task = asyncio.create_task(process_rqst(writer, RQST_QUEUES[writer]))
# Send a test message
await SEND_QUEUES[writer].put("Conenction Opened\r\n")
try:
while data := await reader.readline():
print(f"DATA_RCVD: {data.decode()}")
# Queue message for processing
await RQST_QUEUES[writer].put(data.decode().rstrip())
except asyncio.CancelledError:
print(f'Remote {peername} connection cancelled.')
except asyncio.IncompleteReadError:
print(f'Remote {peername} disconnected')
finally:
print(f'Remote {peername} closed')
await SEND_QUEUES[writer].put(None)
await send_task
del SEND_QUEUES[writer]
await RQST_QUEUES[writer].put(None)
await process_task
del RQST_QUEUES[writer]
async def send_client(writer: StreamWriter, queue: Queue):
while True:
try:
data = await queue.get()
except asyncio.CancelledError:
continue
if not data:
break
try:
writer.write(data.encode())
await writer.drain()
except asyncio.CancelledError:
writer.write(data.encode())
await writer.drain()
writer.close()
await writer.wait_closed()
async def process_rqst(writer: StreamWriter, queue: Queue):
with suppress(asyncio.CancelledError):
while True:
print(f"PROCESS - awaiting RQST_MSG")
if not (msg := await queue.get()):
break
# Instead of processing, just acknowledge it for now and send to ever connection
for writer in SEND_QUEUES:
if not SEND_QUEUES[writer].full():
print(f"PROCESS - Add data to writer {writer.get_extra_info('peername')}\r\n")
await SEND_QUEUES[writer].put(f"ACK {msg}\r\n")
async def main(*args, **kwargs):
server = await asyncio.start_server(*args, **kwargs)
async with server:
await server.serve_forever()
try:
asyncio.run(main(client, host='', port=25000))
except KeyboardInterrupt:
print('\r\nBye!')
客户端有两个协程,一个是从键盘接收数据并发送给服务器,另一个是从服务器接收数据。这是客户端的代码:
import argparse
import asyncio
import aioconsole
from asyncio import StreamReader, StreamWriter
async def msg_reader(reader: StreamReader):
print(f"READER - msg_reader initialized")
try:
while data := await reader.readline():
print(f"{data.decode()}\r\n> ")
print(f"READER - Connection Ended")
except asyncio.CancelledError as cerr:
print(f"\r\nREADER ERR - {cerr}")
print(f'READER - Remote connection cancelled.')
except asyncio.IncompleteReadError:
print(f'\r\nREADER - Remote disconnected')
finally:
print(f'READER - Remote closed')
async def msg_writer(writer: StreamWriter):
print(f'WRITER - msg_writer {writer.get_extra_info("sockname")} initialized')
try:
while True:
msg = await aioconsole.ainput("> ")
writer.write(msg.encode())
await writer.drain()
except asyncio.CancelledError as cerr:
print(f"\r\nWRITER ERR - {cerr}")
print(f'WRITER - Remote connection cancelled.')
finally:
print(f'WRITER - Remote closed')
writer.close()
await writer.wait_closed()
async def main():
parser = argparse.ArgumentParser(description = "This is the client for the multi threaded socket server!")
parser.add_argument('--host', metavar = 'host', type = str, nargs = '?', default = "127.0.0.1")
parser.add_argument('--port', metavar = 'port', type = int, nargs = '?', default = 25000)
args = parser.parse_args()
print(f"Connecting to server: {args.host} on port: {args.port}")
reader, writer = await asyncio.open_connection(host=args.host, port=args.port)
await asyncio.gather(msg_reader(reader), msg_writer(writer))
try:
asyncio.run(main())
except KeyboardInterrupt:
print('\r\nBye!')
现在我介绍操作流程。 服务器 运行宁:
python3 sof_asyncio_server.py
Remote ('127.0.0.1', 50261) connected
PROCESS - awaiting RQST_MSG
Remote ('127.0.0.1', 50263) connected
PROCESS - awaiting RQST_MSG
DATA_RCVD: teste 01teste o2
Remote ('127.0.0.1', 50263) closed
PROCESS - Add data to writer ('127.0.0.1', 50261)
PROCESS - Add data to writer ('127.0.0.1', 50263)
PROCESS - awaiting RQST_MSG
DATA_RCVD: teste 03
Remote ('127.0.0.1', 50261) closed
PROCESS - Adicionando dados ao writer ('127.0.0.1', 50261)
PROCESS - awaiting RQST_MSG
客户 1:
python3 sof_asyncio_client.py
Connecting to server: 127.0.0.1 on port: 25000
READER - msg_reader initialized
WRITER - msg_writer ('127.0.0.1', 50263) initialized
Conenction Opened
>
> teste 01
> teste o2
> ^C
READER ERR -
READER - Remote connection cancelled.
READER - Remote closed
WRITER ERR -
WRITER - Remote connection cancelled.
WRITER - Remote closed
Bye!
客户 2:
python3 sof_asyncio_client.py
Connecting to server: 127.0.0.1 on port: 25000
READER - msg_reader initialized
WRITER - msg_writer ('127.0.0.1', 50261) initialized
Conenction Opened
>
> teste 03
> ACK teste 01teste o2
>
^C
READER ERR -
READER - Remote connection cancelled.
READER - Remote closed
WRITER ERR -
WRITER - Remote connection cancelled.
WRITER - Remote closed
Bye!
正如我所说,只有在我在第一个客户端上按下 Crtl + C 后,它才真正将数据发送到服务器,并且可以在服务器输出和客户端 02 输出的响应中看到。同样,只有在我在客户端 2 上按 Ctrl + C 后,它才发送数据并且可以在服务器输出上看到。
为什么客户端不发送数据?或者可能是服务器没有收到它?我认为是第一个选项,但无法弄清楚为什么...
客户端立即发送数据,但它发送的消息不包括服务器期望的换行符,因为它使用 readline()
来读取消息。这就是为什么服务器只观察 EOF 之后的消息,此时 readline()
returns 累积数据,即使它没有以换行符结尾或包含换行符。
编写器中的循环需要如下所示:
while True:
msg = await aioconsole.ainput("> ")
writer.write(msg.encode())
writer.write('\n')
await writer.drain()
请注意,写入的字节除外,因此需要对'\n'进行编码。
while True:
msg = await aioconsole.ainput("> ")
writer.write(msg.encode())
writer.write('\n'.encode())
await writer.drain()