Python 3 - 多个 AsyncIO 连接
Python 3 - multiple AsyncIO connections
我正在尝试学习如何在 Python 3.7 中使用 AsyncIO,但我仍然对其原理有些困惑。
我的目标是编写一个简单的聊天程序,但是我需要使用环形网络拓扑——一个节点只知道它的两个邻居。当消息被发送时,它通过节点传递,直到它再次到达发送者。这意味着每个节点基本上同时是客户端和服务器。
我还需要能够检测死节点,这样我的环才不会坏。
我认为每个节点为每个邻居建立一个单独的连接可能是一个很好的解决方案 -- successor
和 predecessor
。
class Node:
...
def run():
...
s = loop.create_connection(lambda: Client(...), addr1, port1)
p = loop.create_server(lambda: Server(...), addr2, port2)
successor = loop.run_until_complete(s)
predecessor = loop.run_until_complete(p)
loop.run_forever()
...
...
Server
和 Client
是实现 asyncio.Protocol
.
的 类
我想这样做的原因是,如果有消息通过圈子发送,它总是从 predecessor
发送到 successor
。在 predecessor
的 connection_lost
方法中,我可以检测到它已断开连接并向其 predecessor
发送一条消息(通过整个环)以连接到我。
我希望能够将从我的 predecessor
收到的消息进一步发送到我的 successor
。我还希望能够向我的 successor
发送一条包含我的地址的消息,以防我的 predecessor
死亡(此消息将从 predecessor
的 Server.connection_lost()
和会一直传给我死去的 predecessor
的 predecessor
).
我的问题是:我可以将收到的数据从predecessor
传递到successor
吗?如果不是,这个使用 AsyncIO 和环形拓扑的程序的更好实现是什么?
对于遇到同样问题的 AsyncIO 新手,我自己找到了解决方案。
首先,最好使用 AsyncIO 的高级方面 -- streams
。调用 loop.create_connction
和 loop.create_server
被认为是低级的(我一开始理解错了)。
create_connection
的高级替代方法是 asyncio.open_connection
,它将为您提供一个由 asyncio.StreamReader
和 asyncio.StreamWriter
组成的元组,您可以使用它来读取并写入打开的连接。当从 StreamReader
读取的数据等于 b''
或当您在尝试写入 [=21= 时捕获异常 (ConnectionError
) 时,您还可以检测到连接丢失].
create_server
的高级替代方法是 asyncio.start_server
,需要为其提供一个回调函数,每次与服务器建立连接(打开连接、接收数据)时都会调用该回调函数...)。回调以 StreamReader
和 StreamWriter
作为参数。也可以通过在写入 writer
.
时收到 b''
或 ConnectionError
来检测连接丢失
多个连接可以由协程处理。服务器部分可以有一个协程(它接受来自环形拓扑中一个邻居的连接)和一个客户端部分的协程(它打开到环中另一个邻居的连接)。 Node
class 可以是这样的:
import asyncio
class Node:
...
async def run(self):
...
self.next_reader, self.next_writer = await asyncio.open_connection(self.next_IP, self.next_port)
server_coro = asyncio.create_task(self.server_init())
client_coro = asyncio.create_task(self.client_method())
await client_coro
await server_coro
...
async def server_init(self):
server = await asyncio.start_server(self.server_callback, self.IP, self.port)
async with server:
await server.serve_forever()
async def client_method(self):
...
try:
data = await self.next_reader.read()
except ConnectionError:
...
...
请注意,我对协程使用 asyncio.create_task
和(不在代码清单中)asyncio.run(node.run())
,它们被认为是 asyncio.ensure_future()
和 [=33 的高级替代品=].这两个都是在 Python 3.7 中添加的,并且 asyncio.run()
据说是临时的,所以当你读到这篇文章时,它可能已经被其他东西取代了。
我不是 AsyncIO 专家,因此可能有更好、更简洁的方法来执行此操作(如果您知道,请分享)。
我正在尝试学习如何在 Python 3.7 中使用 AsyncIO,但我仍然对其原理有些困惑。
我的目标是编写一个简单的聊天程序,但是我需要使用环形网络拓扑——一个节点只知道它的两个邻居。当消息被发送时,它通过节点传递,直到它再次到达发送者。这意味着每个节点基本上同时是客户端和服务器。
我还需要能够检测死节点,这样我的环才不会坏。
我认为每个节点为每个邻居建立一个单独的连接可能是一个很好的解决方案 -- successor
和 predecessor
。
class Node:
...
def run():
...
s = loop.create_connection(lambda: Client(...), addr1, port1)
p = loop.create_server(lambda: Server(...), addr2, port2)
successor = loop.run_until_complete(s)
predecessor = loop.run_until_complete(p)
loop.run_forever()
...
...
Server
和 Client
是实现 asyncio.Protocol
.
我想这样做的原因是,如果有消息通过圈子发送,它总是从 predecessor
发送到 successor
。在 predecessor
的 connection_lost
方法中,我可以检测到它已断开连接并向其 predecessor
发送一条消息(通过整个环)以连接到我。
我希望能够将从我的 predecessor
收到的消息进一步发送到我的 successor
。我还希望能够向我的 successor
发送一条包含我的地址的消息,以防我的 predecessor
死亡(此消息将从 predecessor
的 Server.connection_lost()
和会一直传给我死去的 predecessor
的 predecessor
).
我的问题是:我可以将收到的数据从predecessor
传递到successor
吗?如果不是,这个使用 AsyncIO 和环形拓扑的程序的更好实现是什么?
对于遇到同样问题的 AsyncIO 新手,我自己找到了解决方案。
首先,最好使用 AsyncIO 的高级方面 -- streams
。调用 loop.create_connction
和 loop.create_server
被认为是低级的(我一开始理解错了)。
create_connection
的高级替代方法是 asyncio.open_connection
,它将为您提供一个由 asyncio.StreamReader
和 asyncio.StreamWriter
组成的元组,您可以使用它来读取并写入打开的连接。当从 StreamReader
读取的数据等于 b''
或当您在尝试写入 [=21= 时捕获异常 (ConnectionError
) 时,您还可以检测到连接丢失].
create_server
的高级替代方法是 asyncio.start_server
,需要为其提供一个回调函数,每次与服务器建立连接(打开连接、接收数据)时都会调用该回调函数...)。回调以 StreamReader
和 StreamWriter
作为参数。也可以通过在写入 writer
.
b''
或 ConnectionError
来检测连接丢失
多个连接可以由协程处理。服务器部分可以有一个协程(它接受来自环形拓扑中一个邻居的连接)和一个客户端部分的协程(它打开到环中另一个邻居的连接)。 Node
class 可以是这样的:
import asyncio
class Node:
...
async def run(self):
...
self.next_reader, self.next_writer = await asyncio.open_connection(self.next_IP, self.next_port)
server_coro = asyncio.create_task(self.server_init())
client_coro = asyncio.create_task(self.client_method())
await client_coro
await server_coro
...
async def server_init(self):
server = await asyncio.start_server(self.server_callback, self.IP, self.port)
async with server:
await server.serve_forever()
async def client_method(self):
...
try:
data = await self.next_reader.read()
except ConnectionError:
...
...
请注意,我对协程使用 asyncio.create_task
和(不在代码清单中)asyncio.run(node.run())
,它们被认为是 asyncio.ensure_future()
和 [=33 的高级替代品=].这两个都是在 Python 3.7 中添加的,并且 asyncio.run()
据说是临时的,所以当你读到这篇文章时,它可能已经被其他东西取代了。
我不是 AsyncIO 专家,因此可能有更好、更简洁的方法来执行此操作(如果您知道,请分享)。