Asyncio 测试使用 loop.call_soon_threadsafe 的函数

Asyncio testing a function which uses loop.call_soon_threadsafe

我有以下功能:

def send_command(self, cmd):
    self.loop.call_soon_threadsafe(
        functools.partial(
            self._transport.write, str(cmd).encode() + b"\n"
        )
    )

被测系统 (sut) 是 class 继承自 asyncio.Protocol 的系统,它向套接字上的硬件发送一些命令。我必须使用线程,因为这是 wxPython 下 GUI 的一部分。最后,如果我调用 self._transport.write,代码在 Linux 上运行良好,但在 Windows™ 上崩溃。

当运行测试时:

@pytest.mark.asyncio
async def test_send_command(self):
    self.sut._transport = Mock()
    self.sut.send_command("ook eek")
    assert self.sut._transport.write.called is True

我收到断言错误。 self.sut._transport.write 永远不会被调用。如果我直接在函数中调用 self._transport.write,代码会在 Windows™ 上崩溃,但测试正常通过。

我在这里错过了什么?

有人吗?...

当然,这不是极端情况……

解决方法……

reading and experimenting with even loops之后,使用这个:

import asyncio
import selectors

selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)

通过问题。当然,这意味着在 Windows 上使用次高效循环。 ☹ PHA!

欢迎大家有更好的解决办法,一些假网点。