跨多个长期客户端连接的异步
asyncio across multiple long lived client connections
我正在尝试编写一个简单的应用程序,该应用程序连接到多个具有长期连接的基本 TCP 服务器,其中我 send/receive 来自每个连接的数据。基本上从服务器接收事件,发回命令并从命令接收结果。想控制一个设备,但我想在单个线程上控制 N 个设备。
我有一个工作的非异步、非阻塞实现,但是 time.sleep() 正在扼杀响应能力或扼杀 CPU,并且使用 select
太多了冷却器。
给出下面的示例,我想连接到所有三台服务器,并且同时在每台服务器上连接 await receive_message
。目前,它在 connect
的 receive_message() 中被阻止,所以我只得到这个输出:
Connecting to 1
Sending password to 1
Waiting for message from 1
我想得到与此类似的东西,不完全是,但表明连接都是独立安排的。
Connecting to 1
Connecting to 2
Connecting to 3
Sending password to 1
Sending password to 2
Sending password to 3
Waiting for message from 1
Waiting for message from 2
Waiting for message from 3
Connected to 1
Connected to 2
Connected to 3
Waiting for message from 1
Waiting for message from 2
Waiting for message from 3
我正在尝试的淡化版本。不,真正的服务器并没有这么不安全....这只是一个例子。
import asyncio
class Connection:
def __init__(self, name, host, port):
self.name = name
self.host = host
self.port = port
self.reader, self.writer = None, None
self.connected = False
async def connect(self):
print(f'Connecting to {self.name}')
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
await self.send_message('password')
response = await self.receive_message()
if response == 'SUCCESS':
self.connected = True
print(f'Connected to {self.name}')
else:
raise Exception('unsuccessful connection')
print(f'Connected to {self.name}')
async def send_message(self, message):
print(f'Sending {message} to {self.name}')
self.writer.write(f'{message}\n'.encode('utf-8'))
async def receive_message(self):
print(f'Waiting for message from {self.name}')
return (await self.reader.readline()).decode('utf-8')
connections = (
Connection(1, 'localhost', 21114),
Connection(2, 'localhost', 21115),
Connection(3, 'localhost', 21116)
)
async def run():
for connection in connections:
await connection.connect()
# how to receive messages from each connection as they are received
for connection in connections:
await connection.receive_message()
asyncio.run(run())
for
循环中的 await
正在有效地序列化您的连接。在 asyncio 中,并行发生在 task 级别,所以如果你想并行连接到 运行,你需要生成多个任务(或使用一个函数来完成它对你来说,例如 asyncio.gather
)。例如:
async def handle_connection(conn):
await conn.connect()
await conn.receive_message()
# ...
async def run():
tasks = []
for conn in connections:
tasks.append(asyncio.create_task(handle_connection(conn)))
# wait until all the tasks finish
await asyncio.gather(*tasks)
我正在尝试编写一个简单的应用程序,该应用程序连接到多个具有长期连接的基本 TCP 服务器,其中我 send/receive 来自每个连接的数据。基本上从服务器接收事件,发回命令并从命令接收结果。想控制一个设备,但我想在单个线程上控制 N 个设备。
我有一个工作的非异步、非阻塞实现,但是 time.sleep() 正在扼杀响应能力或扼杀 CPU,并且使用 select
太多了冷却器。
给出下面的示例,我想连接到所有三台服务器,并且同时在每台服务器上连接 await receive_message
。目前,它在 connect
的 receive_message() 中被阻止,所以我只得到这个输出:
Connecting to 1
Sending password to 1
Waiting for message from 1
我想得到与此类似的东西,不完全是,但表明连接都是独立安排的。
Connecting to 1
Connecting to 2
Connecting to 3
Sending password to 1
Sending password to 2
Sending password to 3
Waiting for message from 1
Waiting for message from 2
Waiting for message from 3
Connected to 1
Connected to 2
Connected to 3
Waiting for message from 1
Waiting for message from 2
Waiting for message from 3
我正在尝试的淡化版本。不,真正的服务器并没有这么不安全....这只是一个例子。
import asyncio
class Connection:
def __init__(self, name, host, port):
self.name = name
self.host = host
self.port = port
self.reader, self.writer = None, None
self.connected = False
async def connect(self):
print(f'Connecting to {self.name}')
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
await self.send_message('password')
response = await self.receive_message()
if response == 'SUCCESS':
self.connected = True
print(f'Connected to {self.name}')
else:
raise Exception('unsuccessful connection')
print(f'Connected to {self.name}')
async def send_message(self, message):
print(f'Sending {message} to {self.name}')
self.writer.write(f'{message}\n'.encode('utf-8'))
async def receive_message(self):
print(f'Waiting for message from {self.name}')
return (await self.reader.readline()).decode('utf-8')
connections = (
Connection(1, 'localhost', 21114),
Connection(2, 'localhost', 21115),
Connection(3, 'localhost', 21116)
)
async def run():
for connection in connections:
await connection.connect()
# how to receive messages from each connection as they are received
for connection in connections:
await connection.receive_message()
asyncio.run(run())
for
循环中的 await
正在有效地序列化您的连接。在 asyncio 中,并行发生在 task 级别,所以如果你想并行连接到 运行,你需要生成多个任务(或使用一个函数来完成它对你来说,例如 asyncio.gather
)。例如:
async def handle_connection(conn):
await conn.connect()
await conn.receive_message()
# ...
async def run():
tasks = []
for conn in connections:
tasks.append(asyncio.create_task(handle_connection(conn)))
# wait until all the tasks finish
await asyncio.gather(*tasks)