如何在单元测试结束前取消我的异步任务?

How can I cancel my asyncio Task before the unit test ends?

我正在尝试对异步套接字服务器进行单元测试,并使用 pytest-asyncio 使 pytest 与异步代码库兼容。服务器一旦启动,总是可以通过 while 循环发送回复,并且可能大部分时间都在等待 client_loop() 中的传入消息。问题是在单元测试框架终止事件循环并发出此警告之前无法取消此任务:

Task was destroyed but it is pending!

task: < Task pending coro=< Server.new_client() done, defined at /[...path...]/server.py:16> wait_for=< Future pending cb=[< TaskWakeupMethWrapper object at 0x106d7cbe8>()]>>

我似乎可以访问的唯一任务是由 asyncio.create_task() 创建的任务,这似乎不是同一个任务。该任务如下所示:

task: < Task pending coro=< start_server() running at /usr/local/Cellar/python/[...different path...]/streams.py:86>>

因此在此任务上调用 task.cancel(); await task.wait_cancelled() 无效。

如何编写此单元测试以干净地启动并为每个测试启动服务器而不中断可能仍处于 运行ning 的任务?

示例如下:

test_server.py

import pytest
import asyncio

@pytest.fixture
async def server(event_loop):
    from server import Server
    the_server = Server()
    await the_server.start()
    yield the_server
    the_server.stop()

@pytest.mark.asyncio
async def test_connect(server):
    loop = asyncio.get_event_loop()
    reader, writer = await asyncio.open_connection('0.0.0.0', 8888, loop = loop)
    writer.write(b'something')
    await reader.read(100)
    writer.write(b'something else')
    await reader.read(100)
    assert 1

server.py

import asyncio

class Server():
    async def start(self):
        loop = asyncio.get_event_loop()
        coro = asyncio.start_server(self.new_client, '0.0.0.0', 8888, loop = loop)
        task = loop.create_task(coro)
        print('\n')
        print(task)
        self.server = await task

    def stop(self):
        self.server.close()

    async def new_client(self, reader, writer):
        await self.client_loop(reader, writer)

    async def client_loop(self, reader, writer):
        while True:
            await reader.read(100)
            writer.write(b'reply')

如果你想 运行 这个例子只是 运行 pip3 install pytest-asyncio 并且 pytest 可以选择这个插件。

您必须在调用 self.server.close()await self.server.wait_closed()

因此您的灯具应如下所示:

@pytest.fixture
async def server(event_loop):
    from server import Server
    the_server = Server()
    await the_server.start()
    yield the_server
    await the_server.stop()

Serverstop 方法应该如下所示:

    async def stop(self):
        self.server.close()
        await self.server.wait_closed()

有关详细信息,请参阅 the documentation

asyncio.Server.stop() 方法不会完全停止服务器。它只是停止接受新连接。在关闭之前创建的任何连接将继续执行直到完成。

根据the documentation (强调我的)

Stop serving: close listening sockets and set the sockets attribute to None.

The sockets that represent existing incoming client connections are left open.

The server is closed asynchronously, use the wait_closed() coroutine to wait until the server is closed.

在此示例中,所有连接都发送到无限 client_loop 方法。

一个更好的解决方案是在new_client()中创建一个任务集合,负责执行client_loop()逻辑,而不是直接等待方法。使用这种方法,所有打开的任务都可以在 stop() 方法中干净地终止。