无法让 websockets 测试通过 Django 通道
Unable to get websockets tests to pass in Django Channels
给定如下所示的 Django Channels 消费者:
class NotificationConsumer(JsonWebsocketConsumer):
def connect(self):
user = self.scope["user"]
async_to_sync(self.channel_layer.group_add)("hello", "hello")
self.accept()
async_to_sync(self.channel_layer.group_send)(
"hello", {"type": "chat.message", "content": "hello"}
)
def receive_json(self, content, **kwargs):
print(content)
async_to_sync(self.channel_layer.group_send)(
"hello", {"type": "chat.message", "content": "hello"}
)
print("Here we are")
def chat_message(self, event):
self.send_json(content=event["content"])
def disconnect(self, close_code):
# Leave room group
async_to_sync(self.channel_layer.group_discard)("hello", "hello")
和如下所示的测试:
@pytest.mark.asyncio
class TestWebsockets:
async def test_receives_data(self, settings):
communicator = WebsocketCommunicator(
application=application, path="/ws/notifications/"
)
connected, _ = await communicator.connect()
assert connected
await communicator.send_json_to({"type": "notify", "data": "who knows"})
response = await communicator.receive_json_from()
await communicator.disconnect()
当我 运行 测试时,我总是得到 TimeoutError
。我需要做哪些不同的事情?
如果您想查看完整的回购示例,请查看 https://github.com/phildini/websockets-test
要测试异步通道代码,最好使用纯功能性异步测试。
@pytest.mark.asyncio
async def test_receives_data(settings):
communicator = WebsocketCommunicator(
application=application, path="/ws/notifications/"
)
connected, _ = await communicator.connect()
assert connected
await communicator.send_json_to({"type": "notify", "data": "who knows"})
response = await communicator.receive_json_from()
await communicator.disconnect()
pytest 可让您将这些与基于 class
的常规 Django 测试混合使用。
在这里你可以找到一些测试消费者的例子。
https://github.com/hishnash/djangochannelsrestframework/tree/master/tests
async_to_sync(self.channel_layer.group_add)("hello", "hello")
不应该是 async_to_sync(self.channel_layer.group_add)("hello", self.channel_name)
吗?
在第一种情况下,您将 "hello" 添加到组中,并且测试中的 communicator.receive_json_from()
将失败,因为测试客户端不会收到 group_send。
通过将 class 重构为:
class NotificationConsumer(JsonWebsocketConsumer):
def connect(self):
user = self.scope["user"]
async_to_sync(self.channel_layer.group_add)("hello", self.channel_name)
self.accept()
async_to_sync(self.channel_layer.group_send)(
"hello", {"type": "chat.message", "content": "hello"}
)
def receive_json(self, content, **kwargs):
print(content)
async_to_sync(self.channel_layer.group_send)(
"hello", {"type": "chat.message", "content": "hello"}
)
print("Here we are")
def chat_message(self, event):
self.send_json(content=event["content"])
def disconnect(self, close_code):
# Leave room group
async_to_sync(self.channel_layer.group_discard)("hello", self.channel_name)
我可以从示例 repo pass 中获取测试
给定如下所示的 Django Channels 消费者:
class NotificationConsumer(JsonWebsocketConsumer):
def connect(self):
user = self.scope["user"]
async_to_sync(self.channel_layer.group_add)("hello", "hello")
self.accept()
async_to_sync(self.channel_layer.group_send)(
"hello", {"type": "chat.message", "content": "hello"}
)
def receive_json(self, content, **kwargs):
print(content)
async_to_sync(self.channel_layer.group_send)(
"hello", {"type": "chat.message", "content": "hello"}
)
print("Here we are")
def chat_message(self, event):
self.send_json(content=event["content"])
def disconnect(self, close_code):
# Leave room group
async_to_sync(self.channel_layer.group_discard)("hello", "hello")
和如下所示的测试:
@pytest.mark.asyncio
class TestWebsockets:
async def test_receives_data(self, settings):
communicator = WebsocketCommunicator(
application=application, path="/ws/notifications/"
)
connected, _ = await communicator.connect()
assert connected
await communicator.send_json_to({"type": "notify", "data": "who knows"})
response = await communicator.receive_json_from()
await communicator.disconnect()
当我 运行 测试时,我总是得到 TimeoutError
。我需要做哪些不同的事情?
如果您想查看完整的回购示例,请查看 https://github.com/phildini/websockets-test
要测试异步通道代码,最好使用纯功能性异步测试。
@pytest.mark.asyncio
async def test_receives_data(settings):
communicator = WebsocketCommunicator(
application=application, path="/ws/notifications/"
)
connected, _ = await communicator.connect()
assert connected
await communicator.send_json_to({"type": "notify", "data": "who knows"})
response = await communicator.receive_json_from()
await communicator.disconnect()
pytest 可让您将这些与基于 class
的常规 Django 测试混合使用。
在这里你可以找到一些测试消费者的例子。
https://github.com/hishnash/djangochannelsrestframework/tree/master/tests
async_to_sync(self.channel_layer.group_add)("hello", "hello")
不应该是 async_to_sync(self.channel_layer.group_add)("hello", self.channel_name)
吗?
在第一种情况下,您将 "hello" 添加到组中,并且测试中的 communicator.receive_json_from()
将失败,因为测试客户端不会收到 group_send。
通过将 class 重构为:
class NotificationConsumer(JsonWebsocketConsumer):
def connect(self):
user = self.scope["user"]
async_to_sync(self.channel_layer.group_add)("hello", self.channel_name)
self.accept()
async_to_sync(self.channel_layer.group_send)(
"hello", {"type": "chat.message", "content": "hello"}
)
def receive_json(self, content, **kwargs):
print(content)
async_to_sync(self.channel_layer.group_send)(
"hello", {"type": "chat.message", "content": "hello"}
)
print("Here we are")
def chat_message(self, event):
self.send_json(content=event["content"])
def disconnect(self, close_code):
# Leave room group
async_to_sync(self.channel_layer.group_discard)("hello", self.channel_name)
我可以从示例 repo pass 中获取测试