跨多个长期客户端连接的异步

asyncio across multiple long lived client connections

我正在尝试编写一个简单的应用程序,该应用程序连接到多个具有长期连接的基本 TCP 服务器,其中我 send/receive 来自每个连接的数据。基本上从服务器接收事件,发回命令并从命令接收结果。想控制一个设备,但我想在单个线程上控制 N 个设备。

我有一个工作的非异步、非阻塞实现,但是 time.sleep() 正在扼杀响应能力或扼杀 CPU,并且使用 select 太多了冷却器。

给出下面的示例,我想连接到所有三台服务器,并且同时在每台服务器上连接 await receive_message。目前,它在 connect 的 receive_message() 中被阻止,所以我只得到这个输出:

Connecting to 1
Sending password to 1
Waiting for message from 1

我想得到与此类似的东西,不完全是,但表明连接都是独立安排的。

Connecting to 1
Connecting to 2
Connecting to 3
Sending password to 1
Sending password to 2
Sending password to 3
Waiting for message from 1
Waiting for message from 2
Waiting for message from 3
Connected to 1
Connected to 2
Connected to 3
Waiting for message from 1
Waiting for message from 2
Waiting for message from 3

我正在尝试的淡化版本。不,真正的服务器并没有这么不安全....这只是一个例子。

import asyncio


class Connection:
    def __init__(self, name, host, port):
        self.name = name
        self.host = host
        self.port = port
        self.reader, self.writer = None, None
        self.connected = False

    async def connect(self):
        print(f'Connecting to {self.name}')
        self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
        await self.send_message('password')
        response = await self.receive_message()
        if response == 'SUCCESS':
            self.connected = True
            print(f'Connected to {self.name}')
        else:
            raise Exception('unsuccessful connection')
        print(f'Connected to {self.name}')

    async def send_message(self, message):
        print(f'Sending {message} to {self.name}')
        self.writer.write(f'{message}\n'.encode('utf-8'))

    async def receive_message(self):
        print(f'Waiting for message from {self.name}')
        return (await self.reader.readline()).decode('utf-8')


connections = (
    Connection(1, 'localhost', 21114),
    Connection(2, 'localhost', 21115),
    Connection(3, 'localhost', 21116)
)


async def run():
    for connection in connections:
        await connection.connect()
    # how to receive messages from each connection as they are received
    for connection in connections:
        await connection.receive_message()

asyncio.run(run())

for 循环中的 await 正在有效地序列化您的连接。在 asyncio 中,并行发生在 task 级别,所以如果你想并行连接到 运行,你需要生成多个任务(或使用一个函数来完成它对你来说,例如 asyncio.gather)。例如:

async def handle_connection(conn):
    await conn.connect()
    await conn.receive_message()
    # ...

async def run():
    tasks = []
    for conn in connections:
        tasks.append(asyncio.create_task(handle_connection(conn)))
    # wait until all the tasks finish
    await asyncio.gather(*tasks)