无法让 websockets 测试通过 Django 通道

Unable to get websockets tests to pass in Django Channels

给定如下所示的 Django Channels 消费者:

class NotificationConsumer(JsonWebsocketConsumer):
    def connect(self):
        user = self.scope["user"]
        async_to_sync(self.channel_layer.group_add)("hello", "hello")
        self.accept()
        async_to_sync(self.channel_layer.group_send)(
            "hello", {"type": "chat.message", "content": "hello"}
        )

    def receive_json(self, content, **kwargs):
        print(content)
        async_to_sync(self.channel_layer.group_send)(
            "hello", {"type": "chat.message", "content": "hello"}
        )
        print("Here we are")

    def chat_message(self, event):
        self.send_json(content=event["content"])

    def disconnect(self, close_code):
        # Leave room group
        async_to_sync(self.channel_layer.group_discard)("hello", "hello")

和如下所示的测试:

@pytest.mark.asyncio
class TestWebsockets:

    async def test_receives_data(self, settings):

        communicator = WebsocketCommunicator(
            application=application, path="/ws/notifications/"
        )
        connected, _ = await communicator.connect()
        assert connected
        await communicator.send_json_to({"type": "notify", "data": "who knows"})
        response = await communicator.receive_json_from()
        await communicator.disconnect()

当我 运行 测试时,我总是得到 TimeoutError。我需要做哪些不同的事情?

如果您想查看完整的回购示例,请查看 https://github.com/phildini/websockets-test

要测试异步通道代码,最好使用纯功能性异步测试。

@pytest.mark.asyncio
async def test_receives_data(settings):
    communicator = WebsocketCommunicator(
        application=application, path="/ws/notifications/"
    )
    connected, _ = await communicator.connect()
    assert connected
    await communicator.send_json_to({"type": "notify", "data": "who knows"})
    response = await communicator.receive_json_from()
    await communicator.disconnect()

pytest 可让您将这些与基于 class 的常规 Django 测试混合使用。

在这里你可以找到一些测试消费者的例子。

https://github.com/hishnash/djangochannelsrestframework/tree/master/tests

async_to_sync(self.channel_layer.group_add)("hello", "hello") 不应该是 async_to_sync(self.channel_layer.group_add)("hello", self.channel_name) 吗? 在第一种情况下,您将 "hello" 添加到组中,并且测试中的 communicator.receive_json_from() 将失败,因为测试客户端不会收到 group_send。

通过将 class 重构为:

class NotificationConsumer(JsonWebsocketConsumer):
    def connect(self):
        user = self.scope["user"]
        async_to_sync(self.channel_layer.group_add)("hello", self.channel_name)
        self.accept()
        async_to_sync(self.channel_layer.group_send)(
            "hello", {"type": "chat.message", "content": "hello"}
        )

    def receive_json(self, content, **kwargs):
        print(content)
        async_to_sync(self.channel_layer.group_send)(
            "hello", {"type": "chat.message", "content": "hello"}
        )
        print("Here we are")

    def chat_message(self, event):
        self.send_json(content=event["content"])

    def disconnect(self, close_code):
        # Leave room group
        async_to_sync(self.channel_layer.group_discard)("hello", self.channel_name)

我可以从示例 repo pass 中获取测试