python 的 asyncio 是否支持基于协程的 API UDP 网络?

Does asyncio from python support coroutine-based API for UDP networking?

今晚我正在浏览 python asyncio 模块文档,为我的一个课程项目寻找一些想法,但我很快发现 [=71] 中可能缺少一些功能=]的标准aysncio模块。

如果您查看文档,您会发现有一个基于回调的 API 和一个基于协程的 API。并且回调 API 可用于构建 UDP 和 TCP 应用程序,而协程 API 看起来只能用于构建 TCP 应用程序,因为它利用流式 API.

这对我来说是个问题,因为我一直在为 UDP 网络寻找基于协程的 API,尽管我确实发现 asyncio 支持基于协程的低级套接字方法,例如 sock_recv and sock_sendall,但是 UDP 网络的关键 APIs recvfromsendto 不存在。

我想做的是编写一些代码,例如:

async def handle_income_packet(sock):
    await data, addr = sock.recvfrom(4096)
    # data handling here...
    await sock.sendto(addr, response)

我知道这可以使用回调 API 等效地实现,但这里的问题是回调不是协程而是常规函数,因此在其中您不能将控制权交还给事件循环并且保留函数执行状态。

看看上面的代码,如果我们需要在数据处理部分做一些阻塞IO操作,只要我们的IO操作在协程中完成,我们在协程版本中就不会有问题,因为嗯:

async def handle_income_packet(sock):
    await data, addr = sock.recvfrom(4096)
    async with aiohttp.ClientSession() as session:
        info = await session.get(...)
    response = generate_response_from_info(info)
    await sock.sendto(addr, response)

只要我们使用 await,事件循环就会从该点获取控制流来处理其他事情,直到该 IO 完成。但遗憾的是,这些代码目前 可用,因为我们在 asyncio.[=37 中没有 socket.sendtosocket.recvfrom 的协程版本=]

我们可以在其中实现的是使用传输协议回调 API:

class EchoServerClientProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        self.transport = transport

    def data_received(self, data):
        info = requests.get(...)
        response = generate_response_from_info(info)
        self.transport.write(response)
        self.transport.close()

我们不能 await 协程,因为回调不是协程,并且像上面那样使用阻塞 IO 调用会停止回调中的控制流并阻止循环处理任何其他事件,直到 IO完成

另一个推荐的实现思路是在data_received函数中创建一个Future对象,将其添加到事件循环中,并在协议class中存储任何需要的状态变量,然后显式 return 控制循环。虽然这可行,但它确实会创建很多复杂的代码,而在协程版本中,它们根本不需要。

还有 here 我们有一个使用非阻塞套接字和 add_reader 处理 UDP 套接字的例子。但是和coroutine-version的几行代码相比,代码看起来还是很复杂。

我想说的是,协程是一个非常好的设计,可以在一个线程中利用并发的力量,同时也有一个非常简单的设计模式,可以节省脑力和不必要的代码行,但是在我们的 asyncio 标准库中确实缺少使其适用于 UDP 网络的关键部分。

大家对此有何看法?

此外,如果对支持这种 API UDP 网络的第 3 方库有任何其他建议,我将非常感谢我的课程项目。我发现Bluelet很像这样的东西但是好像没有主动维护

编辑:

似乎这个 PR 确实实现了这个功能,但被 asyncio 开发人员拒绝了。开发人员声称所有功能都可以使用 create_datagram_endpoint()、协议传输 API 来实现。但正如我上面所讨论的,协程 API 在许多用例中与使用回调 API 相比具有简单的力量,很遗憾我们在 UDP 中没有这些。

未提供基于流的 API 的原因是因为流在回调之上提供 ordering,而 UDP 通信本质上是无序的,所以这两个根本不兼容。

但是 none 这意味着您不能从回调中调用协程 - 事实上这很容易!从 EchoServerProtocol example 开始,您可以这样做:

def datagram_received(self, data, addr):
    loop = asyncio.get_event_loop()
    loop.create_task(self.handle_income_packet(data, addr))

async def handle_income_packet(self, data, addr):
    # echo back the message, but 2 seconds later
    await asyncio.sleep(2)
    self.transport.sendto(data, addr)

此处 datagram_received 启动您的 handle_income_packet 协程,它可以自由等待任意数量的协程。由于协程在 "background" 中运行,事件循环在任何时候都不会被阻塞,并且 datagram_received returns 立即,正如预期的那样。

您可能对this module providing high-level UDP endpoints for asyncio感兴趣:

async def main():
    # Create a local UDP enpoint
    local = await open_local_endpoint('localhost', 8888)

    # Create a remote UDP enpoint, pointing to the first one
    remote = await open_remote_endpoint(*local.address)

    # The remote endpoint sends a datagram
    remote.send(b'Hey Hey, My My')

    # The local endpoint receives the datagram, along with the address
    data, address = await local.receive()

    # Print: Got 'Hey Hey, My My' from 127.0.0.1 port 50603
    print(f"Got {data!r} from {address[0]} port {address[1]}")

asyncudp 在 asyncio 中提供易于使用的 UDP 套接字。

这是一个例子:

import asyncio
import asyncudp

async def main():
    sock = await asyncudp.create_socket(remote_addr=('127.0.0.1', 9999))
    sock.sendto(b'Hello!')
    print(await sock.recvfrom())
    sock.close()

asyncio.run(main())