在 Celery 任务中获取 Django 通道访问权限
Getting Django channel access in Celery task
我有一个 Django 应用程序,它使用通道来监视 WebSocket 以启动 Celery 中的后端任务。它目前休眠给定的时间,然后 returns true。
问题是我不知道如何从 celery 任务中访问 WebSocket,所以我可以在完成后通知 UI。
celery==4.3.0
channels==2.2.0
Django==2.2.4
django-celery-results==1.1.2
djangorestframework==3.10.2
我的tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time
@shared_task
def gotosleep(timeInSecs):
time.sleep(timeInSecs)
return True
我的consumer.py
from channels.generic.websocket import WebsocketConsumer
import json
from access.tasks import gotosleep
class AccessConsumer(WebsocketConsumer):
def connect(self):
self.accept()
def disconnect(self, close_code):
pass
def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
if message.isnumeric() == True:
print("------------------------------------------------------")
print(message)
gotosleep.delay(int(message))
self.send(text_data=json.dumps({
'message': 'We are dealing with your request'
}))
else:
self.send(text_data=json.dumps({
'message': 'Give me a number'
}))
有什么想法吗?非常感谢
@normic:是的,我正在为后来在我的项目中添加通道层而苦苦挣扎:
@Ken4scholars:非常感谢您提供的链接。这些促使我找到了我要找的东西。
致其他苦苦挣扎的人:
我的tasks.py:
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
@shared_task
def add(x, y):
return x + y
@shared_task
def go_to_sleep_and_add(x,y):
time.sleep(10)
result = int(x)+int(y)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
'task_group_a',
{
'type': 'task_message',
'message': result
}
)
return result
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
我的consumers.py:
from channels.generic.websocket import WebsocketConsumer
import json
from access.tasks import go_to_sleep_and_add
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
class AccessConsumer(WebsocketConsumer):
def connect(self):
self.accept()
self.room_group_name = "task_group_a"
# Join room group
async_to_sync(self.channel_layer.group_add)(
self.room_group_name,
self.channel_name
)
def disconnect(self, close_code):
async_to_sync(self.channel_layer.group_discard)(
self.room_group_name,
self.channel_name
)
def receive(self, text_data):
text_data_json = json.loads(text_data)
num1 = text_data_json['message']['1']
num2 = text_data_json['message']['2']
if num1.isnumeric() and num2.isnumeric()== True:
go_to_sleep_and_add.delay(num1,num2)
self.send(text_data=json.dumps({
'message': 'We are dealing with your request'
}))
else:
self.send(text_data=json.dumps({
'message': 'Give me numbers'
}))
# Receive message from room group
def task_message(self, event):
message = event['message']
# Send message to WebSocket
self.send(text_data=json.dumps({
'message': message
}))
我的 html 页面在 Django/templates:
<!-- access/templates/access/room.html -->
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8"/>
<title>Access Room</title>
</head>
<body>
<textarea id="access-log" cols="50" rows="5">Results </textarea><br/>
Number 1:<input id="access-message-input" type="text" size="20"/><br/>
Number 2:<input id="access-message-input2" type="text" size="20"/><br/>
<input id="access-message-submit" type="button" value="Send"/>
</body>
<script>
var roomName = {{ room_name_json }};
var accessSocket = new WebSocket(
'ws://' + window.location.host +
'/ws/access/' + roomName + '/');
accessSocket.onmessage = function(e) {
var data = JSON.parse(e.data);
var message = data['message'];
document.querySelector('#access-log').value += (message + '\n');
};
accessSocket.onclose = function(e) {
console.error('Access socket closed unexpectedly');
};
document.querySelector('#access-message-submit').onclick = function(e) {
var messageInputDom = document.querySelector('#access-message-input');
var messageInputDom2 = document.querySelector('#access-message-input2');
COMPLETE = { '1': messageInputDom.value, '2': messageInputDom2.value}
accessSocket.send(JSON.stringify({
'message': COMPLETE
}));
messageInputDom.value = '';
messageInputDom2.value = '';
};
</script>
</html>
我有一个 Django 应用程序,它使用通道来监视 WebSocket 以启动 Celery 中的后端任务。它目前休眠给定的时间,然后 returns true。
问题是我不知道如何从 celery 任务中访问 WebSocket,所以我可以在完成后通知 UI。
celery==4.3.0
channels==2.2.0
Django==2.2.4
django-celery-results==1.1.2
djangorestframework==3.10.2
我的tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time
@shared_task
def gotosleep(timeInSecs):
time.sleep(timeInSecs)
return True
我的consumer.py
from channels.generic.websocket import WebsocketConsumer
import json
from access.tasks import gotosleep
class AccessConsumer(WebsocketConsumer):
def connect(self):
self.accept()
def disconnect(self, close_code):
pass
def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
if message.isnumeric() == True:
print("------------------------------------------------------")
print(message)
gotosleep.delay(int(message))
self.send(text_data=json.dumps({
'message': 'We are dealing with your request'
}))
else:
self.send(text_data=json.dumps({
'message': 'Give me a number'
}))
有什么想法吗?非常感谢
@normic:是的,我正在为后来在我的项目中添加通道层而苦苦挣扎:
@Ken4scholars:非常感谢您提供的链接。这些促使我找到了我要找的东西。
致其他苦苦挣扎的人:
我的tasks.py:
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
@shared_task
def add(x, y):
return x + y
@shared_task
def go_to_sleep_and_add(x,y):
time.sleep(10)
result = int(x)+int(y)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
'task_group_a',
{
'type': 'task_message',
'message': result
}
)
return result
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
我的consumers.py:
from channels.generic.websocket import WebsocketConsumer
import json
from access.tasks import go_to_sleep_and_add
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
class AccessConsumer(WebsocketConsumer):
def connect(self):
self.accept()
self.room_group_name = "task_group_a"
# Join room group
async_to_sync(self.channel_layer.group_add)(
self.room_group_name,
self.channel_name
)
def disconnect(self, close_code):
async_to_sync(self.channel_layer.group_discard)(
self.room_group_name,
self.channel_name
)
def receive(self, text_data):
text_data_json = json.loads(text_data)
num1 = text_data_json['message']['1']
num2 = text_data_json['message']['2']
if num1.isnumeric() and num2.isnumeric()== True:
go_to_sleep_and_add.delay(num1,num2)
self.send(text_data=json.dumps({
'message': 'We are dealing with your request'
}))
else:
self.send(text_data=json.dumps({
'message': 'Give me numbers'
}))
# Receive message from room group
def task_message(self, event):
message = event['message']
# Send message to WebSocket
self.send(text_data=json.dumps({
'message': message
}))
我的 html 页面在 Django/templates:
<!-- access/templates/access/room.html -->
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8"/>
<title>Access Room</title>
</head>
<body>
<textarea id="access-log" cols="50" rows="5">Results </textarea><br/>
Number 1:<input id="access-message-input" type="text" size="20"/><br/>
Number 2:<input id="access-message-input2" type="text" size="20"/><br/>
<input id="access-message-submit" type="button" value="Send"/>
</body>
<script>
var roomName = {{ room_name_json }};
var accessSocket = new WebSocket(
'ws://' + window.location.host +
'/ws/access/' + roomName + '/');
accessSocket.onmessage = function(e) {
var data = JSON.parse(e.data);
var message = data['message'];
document.querySelector('#access-log').value += (message + '\n');
};
accessSocket.onclose = function(e) {
console.error('Access socket closed unexpectedly');
};
document.querySelector('#access-message-submit').onclick = function(e) {
var messageInputDom = document.querySelector('#access-message-input');
var messageInputDom2 = document.querySelector('#access-message-input2');
COMPLETE = { '1': messageInputDom.value, '2': messageInputDom2.value}
accessSocket.send(JSON.stringify({
'message': COMPLETE
}));
messageInputDom.value = '';
messageInputDom2.value = '';
};
</script>
</html>