从视图访问消费者以通过 WebSocket 将 api-接收到的数据推送给用户(Django-Channelsv2.2 和 DRF)
Access Consumer from View to push api-received data to User via WebSocket (Django-Channelsv2.2 & DRF)
我发现自己正在努力寻找一种方法将数据从视图(API查看 Django-rest-framework)或分布式任务(Celery/RabbitMQ 直接推送到消费者(Django Channels v.2.2) )
我真的很感激任何允许我从 View 访问消费者的代码示例,因为触发器本身是输入设备,而不是网络,所以我可以从它进行 API 调用,所以我需要连接器。
Django Channels v.2.2、DjangoChannelsRestFramework,主要是此处描述的所有案例:https://channels.readthedocs.io/en/latest/community.html 以及相关问题@Whosebug
consumers.py
class BasicConsumer(AsyncConsumer):
async def websocket_connect(self, event):
print('connected', event)
await self.send({
"type": "websocket.accept"
})
# await self.send({
# "type": "websocket.send",
# "text": "Hello world"
# })
async def websocket_receive(self, event):
print('receive', event)
front_text = event.get('text', None)
if front_text is not None:
loaded_dict_data = json.loads(front_text)
msg = loaded_dict_data.get('message')
print(msg)
user = self.scope['user']
if user.is_authenticated:
username = user.username
myResponse = {
'message': msg,
'username': username
}
await self.send({
"type": "websocket.send",
"text": json.dumps(myResponse)
})
async def websocket_disconnect(self, event):
print('disconnect', event)
@database_sync_to_async
def get_thread(self, user, other_username):
return Thread.objects.get_or_new(user, other_username)[0]
views.py
class BasicView(APIView):
def post(self, request):
serializer = BasicViewSerializer(data=request.data)
if serializer.is_valid():
triggerConsumer(serializer.validated_data)
我认为定义一个通道名称并向其发送数据会更好。
from asgiref.sync import async_to_sync
async_to_sync(channel_layer.send)("channel_name", {...})
您可以在此处查看完整文档doc
我发现自己正在努力寻找一种方法将数据从视图(API查看 Django-rest-framework)或分布式任务(Celery/RabbitMQ 直接推送到消费者(Django Channels v.2.2) )
我真的很感激任何允许我从 View 访问消费者的代码示例,因为触发器本身是输入设备,而不是网络,所以我可以从它进行 API 调用,所以我需要连接器。
Django Channels v.2.2、DjangoChannelsRestFramework,主要是此处描述的所有案例:https://channels.readthedocs.io/en/latest/community.html 以及相关问题@Whosebug
consumers.py
class BasicConsumer(AsyncConsumer):
async def websocket_connect(self, event):
print('connected', event)
await self.send({
"type": "websocket.accept"
})
# await self.send({
# "type": "websocket.send",
# "text": "Hello world"
# })
async def websocket_receive(self, event):
print('receive', event)
front_text = event.get('text', None)
if front_text is not None:
loaded_dict_data = json.loads(front_text)
msg = loaded_dict_data.get('message')
print(msg)
user = self.scope['user']
if user.is_authenticated:
username = user.username
myResponse = {
'message': msg,
'username': username
}
await self.send({
"type": "websocket.send",
"text": json.dumps(myResponse)
})
async def websocket_disconnect(self, event):
print('disconnect', event)
@database_sync_to_async
def get_thread(self, user, other_username):
return Thread.objects.get_or_new(user, other_username)[0]
views.py
class BasicView(APIView):
def post(self, request):
serializer = BasicViewSerializer(data=request.data)
if serializer.is_valid():
triggerConsumer(serializer.validated_data)
我认为定义一个通道名称并向其发送数据会更好。
from asgiref.sync import async_to_sync
async_to_sync(channel_layer.send)("channel_name", {...})
您可以在此处查看完整文档doc