如果我使用缓存的 gRPC 通道,pytest 将失败,仅在多个测试中

pytest fails if I use cached gRPC Channel, only in multiple tests

总结

我正在制作与其他微服务服务器交互的客户端服务。

通过使用 gRPC 通道,此客户端从服务器获取一些数据。连接使用频繁,参与者固定,所以我重用gRPC Channel和Stub来降低通道创建成本。

此服务对每个请求都表现良好,并且每个测试一次单独运行。然而在多次测试中,我发现只有一个测试成功,另一个会失败 TimeOutError(gRPC status -DEADLINE_EXCEEDED) 或停止。

有趣的是,当我删除通道缓存(@lru_cache)或为会话(或模块)范围添加 pytest event_loop fixture override 时,这个问题就解决了。我在

中找到了第二种方法

为什么会这样?是什么让我的测试停止或失败?估计跟事件循环有关,具体情况不详


最小可重现示例 (MRE)

# mre.py
from functools import lru_cache

from grpclib.client import Channel

from config.config import current_config
from custom.protobuf.service import OtherServiceStub


@lru_cache(maxsize=None)
def get_cached_client() -> OtherServiceStub:
    host, port = '127.0.0.1', 50051
    channel = Channel(host, port)
    cached_client = OtherServiceStub(channel)
    return cached_client

async def get_data(arg1: str = None):
    client = get_cached_client()
    data = client.get_detail_data(arg1='foo')
    return data
# test_mre.py

@pytest.mark.asyncio
async def test_1(): # SUCCEED
    client = get_cached_client()
    await client.get_data(arg1='foo')


@pytest.mark.asyncio
async def test_2(): # FAIL(or STOP)
    client = get_cached_client()
    await client.get_data(arg1='bar')

@pytest.mark.asyncio
async def test_3(): # FAIL(or STOP)
    client = get_cached_client()
    await client.get_data(arg1='something')
# solved if(1)
# not cached
def get_cached_client() -> OtherServiceStub:
    host, port = '127.0.0.1', 50051
    channel = Channel(host, port)
    cached_client = OtherServiceStub(channel)
    return cached_client

# solved if(2)
# override event_loop fixture
@pytest.fixture(scope="session")
def event_loop(request):
    """Create an instance of the default event loop for each test case."""
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()

环境

pytest==6.2.4
pytest-asyncio==0.15.1
grpcio==1.37.1
grpcio-tools==1.37.1
grpclib==0.4.1
protobuf==3.15.8
betterproto==1.2.5

我发现这个问题源于 gRPC Channel 和 pytest event loop fixture 的实现。

lru_cache returns 如果调用 same 函数则缓存结果。 'same' 意味着如果函数被相同的输入(参数)调用。询问的缓存函数没有参数,所以如果你调用这个函数,除了第一次调用之外,你会得到与之前调用完全相同的结果。所以,你在测试代码中的grpc频道都是完全相同的频道。

# test_mre.py

@pytest.mark.asyncio
async def test_1(): 
    client = get_cached_client() # FIRST CALL - Create Channel & Stub object in here
    await client.get_data(arg1='foo')


@pytest.mark.asyncio
async def test_2(): # FAIL(or STOP)
    client = get_cached_client() # NOT FIRST CALL - Reuse object which is created in test_1
    await client.get_data(arg1='bar')

@pytest.mark.asyncio
async def test_3(): # FAIL(or STOP)
    client = get_cached_client() # NOT FIRST CALL - Reuse object which is created in test_1
    await client.get_data(arg1='something')

那么,为什么复用的通道不能正常使用呢?问题出在 pytest-asyncio 装饰器中。

@pytest.mark.asyncio 创建新的事件循环,并在函数完成后关闭它所应用的每个函数。默认的事件循环范围是 function。您可以在 implementation of event loop fixture in pytest-asyncio.

中看到这一点

Python gRPC Channel 对象注册创建它的事件循环环境,当事件循环关闭时,Channel 关闭。在问的例子中,它是test_1函数事件循环。当您调用同一个通道并尝试在 test_2 函数中使用它时,test_1 事件循环已经关闭,因此通道已关闭(running=Falseclosed=True)。这意味着 await 请求将永远得不到响应。

@pytest.mark.asyncio
async def test_1(): 
    client = get_cached_client()
    await client.get_data(arg1='foo')
    # At this point, event loop and the channel is closed.


@pytest.mark.asyncio
async def test_2(): 
    client = get_cached_client() # Calling closed channel
    await client.get_data(arg1='bar')

@pytest.mark.asyncio
async def test_3(): 
    client = get_cached_client() # Calling closed channel
    await client.get_data(arg1='something')

所以这就是第一次测试成功而其他测试失败的原因。只有在第一个事件循环中,通道才有效。如果您设置了 timeout 参数,那么测试将失败,因为您无法在超时限制内从 gRPC 服务器获得响应(无论多么足够)。如果没有,您会看到所有其他测试都已停止,因为 python gRPC 通道没有默认超时限制。

您的两个解决方案可以解决此问题。首先,如果 Channel 对象没有缓存,那么每个测试函数都会创建自己的通道,事件循环问题就解决了。其次,如果您在会话范围内设置默认事件循环,则可以在所有测试函数中重用您的默认事件循环装置。所以 Channel 对象不会被关闭(因为它的事件循环没有关闭)。