异步服务器和客户端处理来自控制台的输入
asyncio server and client to handle input from console
我有一个异步 TCP 服务器,它从客户端获取消息,在服务器上执行 stuff() 并发回文本。服务器在正确接收和发送数据的意义上运行良好。问题是我无法在客户端从服务器取回消息,因为我在控制台输入时有阻塞例程(基本上 data_received 方法永远不会执行)。只有 exit 命令工作正常(它关闭循环)。
如何解决这个问题?这是服务器和客户端代码。它基本上是 EchoClient asyncio 版本,带有更多用于练习的管道代码。
# client.py
import abc
import asyncio
import sys
MENU = '''
a) do x
b) do y
c) exit
'''
loop_ = asyncio.get_event_loop()
class XCommand:
def run(self):
self.client.send_data_to_tcp('X:') # to bytes
class YCommand(Command):
def run(self):
s = input('Input for Y ### ')
self.client.send_data_to_tcp('Y:' + s)
class ExitCommand(Command):
def run(self):
self.client.send_data_to_tcp('EXIT:')
print('Goodbye!')
loop_.close()
exit()
class CommandFactory:
_cmds = {'a': ACommand,
'b': BCommand,
'c': ExitCommand,
}
@classmethod
def get_cmd(cls, cmd):
cmd_cls = cls._cmds.get(cmd)
return cmd_cls
def show_menu(client):
print(MENU)
while True:
command = input('Insert Command$: ')
cmd_cls = CommandFactory.get_cmd(command)
if not cmd_cls:
print('Unknown: {}'.format(command))
continue
cmd_cls(client).run()
class Client(asyncio.Protocol):
def __init__(self, loop):
self.loop = loop
self.transport = None
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print('Data received from server: \n{!r}'.format(data.decode()), flush=True)
def send_data_to_tcp(self, data):
self.transport.write(data.encode())
def connection_lost(self, exc):
print('The server closed the connection')
print('Stop the event loop')
self.loop.stop()
def main():
client = Client(loop_)
coro = loop_.create_connection(lambda: client, '127.0.0.1', 10888)
loop_.run_until_complete(coro)
loop_.run_in_executor(None, show_menu(client)) # I've tried this...not working
loop_.run_forever()
loop_.close()
if __name__ == '__main__':
main()
# server.py
import abc
import asyncio
import sys
from asyncio_exercise.db import DB
class ACommand:
@classmethod
def run(cls, db, param1=None, param2=None):
res = db.a()
if not res:
return '>>>>>>>>>>> Empty <<<<<<<<<<<<<'
return '\n'.join('{}: {}'.format(col, val) for col, val in res.items())
class BCommand:
@classmethod
def run(cls, db, param1=None, param2=None):
db.b(param1, param2)
return 'B Ok!'
class ExitCommand:
@classmethod
def run(cls, db, param1=None, param2=None):
loop.close()
server.close()
loop.run_until_complete(server.wait_closed())
print('Buona giornata!!!')
sys.exit(0)
class CommandFactory:
_cmds = {'X': ACommand,
'Y': BCommand,
'EXIT': ExitCommand}
@classmethod
def get_cmd(cls, cmd):
tokens = cmd.split(':')
cmd = tokens[0]
if len(tokens) == 1:
param1, param2 = None, None
else:
param1, param2 = (tokens[1], tokens[2]) if len(tokens) == 3 else (tokens[1], None)
cmd_cls = cls._cmds.get(cmd)
return cmd_cls, param1, param2
class Server(asyncio.Protocol):
db_filename = '../data/db'
db = DB(db_filename)
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
cmd_cls, param1, param2 = CommandFactory.get_cmd(message)
res = cmd_cls.run(self.db, param1, param2)
print('Sending response: {!r}'.format(res))
self.transport.write(bytes(res, encoding='UTF-8'))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(Server, '127.0.0.1', 10888)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
更新:
解决方案是使用 aioconsole 包并使用 aioconsole 输入 function.Below 代码(工作得很好)。
# server.py
import abc
import asyncio
from d_1_networking.esercizio_soluzione.SOversion.dummydb import DummyDB as DB
class Command(metaclass=abc.ABCMeta):
@abc.abstractclassmethod
def run(self, a, b, c):
raise NotImplementedError()
class XCommand(Command):
@classmethod
def run(cls, db, param1=None, param2=None):
res = db.x()
if not res:
return '>>>>>>>>>>> Empty response! <<<<<<<<<<<<<'
return '\n'.join('{}: {}'.format(col, val) for col, val in res.items())
class YCommand(Command):
@classmethod
def run(cls, db, param1=None, param2=None):
db.y(param1)
return 'Operation Y OK: {}'.format(param1)
class QuitCommand(Command):
@classmethod
def run(cls, rubrica_db, param1=None, param2=None):
return 'Disconnected...'
class CommandFactory:
_cmds = {'X': XCommand,
'Y': YCommand,
'DISCONNECT': QuitCommand}
@classmethod
def get_cmd(cls, cmd):
tokens = cmd.split(':')
cmd = tokens[0]
if len(tokens) == 1:
nome, numero = None, None
else:
nome, numero = (tokens[1], tokens[2]) if len(tokens) == 3 else (tokens[1], None)
cmd_cls = cls._cmds.get(cmd)
return cmd_cls, nome, numero
class Server(asyncio.Protocol):
db_filename = '../data/exercise.db'
db = DB(db_filename)
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
cmd_cls, param1, param2 = CommandFactory.get_cmd(message)
res = cmd_cls.run(self.db, param1, param2)
print('Sending response: {!r}'.format(res))
self.transport.write(bytes(res, encoding='UTF-8'))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(RubricaServer, '127.0.0.1', 10888)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
#dummydb.py
class DummyDB:
def __init__(self, fn):
self.fn = fn
def x(self):
return {'field_a': '55 tt TTYY 3334 gghyyujh',
'field_b': 'FF hhhnneeekk',
'field_c': '00993342489048222 news'}
def y(self, param):
return param
# client.py
import abc
from asyncio import *
from aioconsole import ainput
MENU = '''
---------------------------
A) Command X
B) Command Y (require additional input)
C) Quit program
---------------------------
'''
loop_ = get_event_loop()
class Command(metaclass=abc.ABCMeta):
asyn = False
def __init__(self, tcp_client):
self.client = tcp_client
@abc.abstractmethod
def run(self):
raise NotImplementedError()
class ACommand(Command):
def run(self):
# send X command to server
self.client.send_data_to_tcp('X:')
class BCommand(Command):
asyn = True
async def run(self):
s = await ainput('Insert data for B operation (es. name:43d3HHte3) > ')
# send Y command to server
self.client.send_data_to_tcp('Y:' + s)
class QuitCommand(Command):
def run(self):
self.client.send_data_to_tcp('DISCONNECT:')
print('Goodbye!!!')
self.client.disconnect()
exit()
class CommandFactory:
_cmds = {'A': ACommand,
'B': BCommand,
'C': QuitCommand}
@classmethod
def get_cmd(cls, cmd):
cmd = cmd.strip()
cmd_cls = cls._cmds.get(cmd)
return cmd_cls
class Client(Protocol):
def __init__(self, loop):
self.loop = loop
self.transport = None
def disconnect(self):
self.loop.stop()
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print('Data received from server: \n===========\n{}\n===========\n'.format(data.decode()), flush=True)
def send_data_to_tcp(self, data):
self.transport.write(data.encode())
def connection_lost(self, exc):
print('The server closed the connection')
print('Stop the event loop')
self.loop.stop()
def menu():
print(MENU)
async def main():
menu()
while True:
cmd = await ainput('Insert Command >')
cmd_cls = CommandFactory.get_cmd(cmd)
if not cmd_cls:
print('Unknown: {}'.format(cmd))
elif cmd_cls.asyn:
await cmd_cls(client).run()
else:
cmd_cls(client).run()
if __name__ == '__main__':
client = Client(loop_)
coro = loop_.create_connection(lambda: client, '127.0.0.1', 10888)
loop_.run_until_complete(coro)
loop_.run_until_complete(main())
您可以考虑使用 aioconsole.ainput:
from aioconsole import ainput
async def some_coroutine():
line = await ainput(">>> ")
[...]
该项目在 PyPI 可用。
另一种方法就是使用 run_in_executor
类似
from functools import partial
from concurrent.futures.thread import ThreadPoolExecutor
async def f():
rie = partial(asyncio.get_event_loop().run_in_executor, ThreadPoolExecutor(1))
while True:
await rie(input)
我有一个异步 TCP 服务器,它从客户端获取消息,在服务器上执行 stuff() 并发回文本。服务器在正确接收和发送数据的意义上运行良好。问题是我无法在客户端从服务器取回消息,因为我在控制台输入时有阻塞例程(基本上 data_received 方法永远不会执行)。只有 exit 命令工作正常(它关闭循环)。 如何解决这个问题?这是服务器和客户端代码。它基本上是 EchoClient asyncio 版本,带有更多用于练习的管道代码。
# client.py
import abc
import asyncio
import sys
MENU = '''
a) do x
b) do y
c) exit
'''
loop_ = asyncio.get_event_loop()
class XCommand:
def run(self):
self.client.send_data_to_tcp('X:') # to bytes
class YCommand(Command):
def run(self):
s = input('Input for Y ### ')
self.client.send_data_to_tcp('Y:' + s)
class ExitCommand(Command):
def run(self):
self.client.send_data_to_tcp('EXIT:')
print('Goodbye!')
loop_.close()
exit()
class CommandFactory:
_cmds = {'a': ACommand,
'b': BCommand,
'c': ExitCommand,
}
@classmethod
def get_cmd(cls, cmd):
cmd_cls = cls._cmds.get(cmd)
return cmd_cls
def show_menu(client):
print(MENU)
while True:
command = input('Insert Command$: ')
cmd_cls = CommandFactory.get_cmd(command)
if not cmd_cls:
print('Unknown: {}'.format(command))
continue
cmd_cls(client).run()
class Client(asyncio.Protocol):
def __init__(self, loop):
self.loop = loop
self.transport = None
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print('Data received from server: \n{!r}'.format(data.decode()), flush=True)
def send_data_to_tcp(self, data):
self.transport.write(data.encode())
def connection_lost(self, exc):
print('The server closed the connection')
print('Stop the event loop')
self.loop.stop()
def main():
client = Client(loop_)
coro = loop_.create_connection(lambda: client, '127.0.0.1', 10888)
loop_.run_until_complete(coro)
loop_.run_in_executor(None, show_menu(client)) # I've tried this...not working
loop_.run_forever()
loop_.close()
if __name__ == '__main__':
main()
# server.py
import abc
import asyncio
import sys
from asyncio_exercise.db import DB
class ACommand:
@classmethod
def run(cls, db, param1=None, param2=None):
res = db.a()
if not res:
return '>>>>>>>>>>> Empty <<<<<<<<<<<<<'
return '\n'.join('{}: {}'.format(col, val) for col, val in res.items())
class BCommand:
@classmethod
def run(cls, db, param1=None, param2=None):
db.b(param1, param2)
return 'B Ok!'
class ExitCommand:
@classmethod
def run(cls, db, param1=None, param2=None):
loop.close()
server.close()
loop.run_until_complete(server.wait_closed())
print('Buona giornata!!!')
sys.exit(0)
class CommandFactory:
_cmds = {'X': ACommand,
'Y': BCommand,
'EXIT': ExitCommand}
@classmethod
def get_cmd(cls, cmd):
tokens = cmd.split(':')
cmd = tokens[0]
if len(tokens) == 1:
param1, param2 = None, None
else:
param1, param2 = (tokens[1], tokens[2]) if len(tokens) == 3 else (tokens[1], None)
cmd_cls = cls._cmds.get(cmd)
return cmd_cls, param1, param2
class Server(asyncio.Protocol):
db_filename = '../data/db'
db = DB(db_filename)
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
cmd_cls, param1, param2 = CommandFactory.get_cmd(message)
res = cmd_cls.run(self.db, param1, param2)
print('Sending response: {!r}'.format(res))
self.transport.write(bytes(res, encoding='UTF-8'))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(Server, '127.0.0.1', 10888)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
更新: 解决方案是使用 aioconsole 包并使用 aioconsole 输入 function.Below 代码(工作得很好)。
# server.py
import abc
import asyncio
from d_1_networking.esercizio_soluzione.SOversion.dummydb import DummyDB as DB
class Command(metaclass=abc.ABCMeta):
@abc.abstractclassmethod
def run(self, a, b, c):
raise NotImplementedError()
class XCommand(Command):
@classmethod
def run(cls, db, param1=None, param2=None):
res = db.x()
if not res:
return '>>>>>>>>>>> Empty response! <<<<<<<<<<<<<'
return '\n'.join('{}: {}'.format(col, val) for col, val in res.items())
class YCommand(Command):
@classmethod
def run(cls, db, param1=None, param2=None):
db.y(param1)
return 'Operation Y OK: {}'.format(param1)
class QuitCommand(Command):
@classmethod
def run(cls, rubrica_db, param1=None, param2=None):
return 'Disconnected...'
class CommandFactory:
_cmds = {'X': XCommand,
'Y': YCommand,
'DISCONNECT': QuitCommand}
@classmethod
def get_cmd(cls, cmd):
tokens = cmd.split(':')
cmd = tokens[0]
if len(tokens) == 1:
nome, numero = None, None
else:
nome, numero = (tokens[1], tokens[2]) if len(tokens) == 3 else (tokens[1], None)
cmd_cls = cls._cmds.get(cmd)
return cmd_cls, nome, numero
class Server(asyncio.Protocol):
db_filename = '../data/exercise.db'
db = DB(db_filename)
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
cmd_cls, param1, param2 = CommandFactory.get_cmd(message)
res = cmd_cls.run(self.db, param1, param2)
print('Sending response: {!r}'.format(res))
self.transport.write(bytes(res, encoding='UTF-8'))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(RubricaServer, '127.0.0.1', 10888)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
#dummydb.py
class DummyDB:
def __init__(self, fn):
self.fn = fn
def x(self):
return {'field_a': '55 tt TTYY 3334 gghyyujh',
'field_b': 'FF hhhnneeekk',
'field_c': '00993342489048222 news'}
def y(self, param):
return param
# client.py
import abc
from asyncio import *
from aioconsole import ainput
MENU = '''
---------------------------
A) Command X
B) Command Y (require additional input)
C) Quit program
---------------------------
'''
loop_ = get_event_loop()
class Command(metaclass=abc.ABCMeta):
asyn = False
def __init__(self, tcp_client):
self.client = tcp_client
@abc.abstractmethod
def run(self):
raise NotImplementedError()
class ACommand(Command):
def run(self):
# send X command to server
self.client.send_data_to_tcp('X:')
class BCommand(Command):
asyn = True
async def run(self):
s = await ainput('Insert data for B operation (es. name:43d3HHte3) > ')
# send Y command to server
self.client.send_data_to_tcp('Y:' + s)
class QuitCommand(Command):
def run(self):
self.client.send_data_to_tcp('DISCONNECT:')
print('Goodbye!!!')
self.client.disconnect()
exit()
class CommandFactory:
_cmds = {'A': ACommand,
'B': BCommand,
'C': QuitCommand}
@classmethod
def get_cmd(cls, cmd):
cmd = cmd.strip()
cmd_cls = cls._cmds.get(cmd)
return cmd_cls
class Client(Protocol):
def __init__(self, loop):
self.loop = loop
self.transport = None
def disconnect(self):
self.loop.stop()
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print('Data received from server: \n===========\n{}\n===========\n'.format(data.decode()), flush=True)
def send_data_to_tcp(self, data):
self.transport.write(data.encode())
def connection_lost(self, exc):
print('The server closed the connection')
print('Stop the event loop')
self.loop.stop()
def menu():
print(MENU)
async def main():
menu()
while True:
cmd = await ainput('Insert Command >')
cmd_cls = CommandFactory.get_cmd(cmd)
if not cmd_cls:
print('Unknown: {}'.format(cmd))
elif cmd_cls.asyn:
await cmd_cls(client).run()
else:
cmd_cls(client).run()
if __name__ == '__main__':
client = Client(loop_)
coro = loop_.create_connection(lambda: client, '127.0.0.1', 10888)
loop_.run_until_complete(coro)
loop_.run_until_complete(main())
您可以考虑使用 aioconsole.ainput:
from aioconsole import ainput
async def some_coroutine():
line = await ainput(">>> ")
[...]
该项目在 PyPI 可用。
另一种方法就是使用 run_in_executor
类似
from functools import partial
from concurrent.futures.thread import ThreadPoolExecutor
async def f():
rie = partial(asyncio.get_event_loop().run_in_executor, ThreadPoolExecutor(1))
while True:
await rie(input)