Celery+Django -- 使用 Django 消息框架轮询任务状态并报告成功或失败
Celery+Django -- Poll task for state and report success or failure using Django messages framework
在我使用 Celery 的 Django 项目中(以及许多其他项目),我有一个 Celery 任务会在后台将文件上传到数据库。我使用轮询来跟踪上传进度并显示上传进度条。以下是一些详细介绍上传过程的片段:
views.py:
from .tasks import upload_task
...
upload_task.delay(datapoints, user, description) # datapoints is a list of dictionaries, user and description are simple strings
tasks.py:
from taskman.celery import app, DBTask # taskman is the name of the Django app that has celery.py
from celery import task, current_task
@task(base=DBTask)
def upload_task(datapoints, user, description):
from utils.db.databaseinserter import insertIntoDatabase
for count in insertIntoDatabase(datapoints, user, description):
percent_completion = int(100 * (float(count) / float(len(datapoints))))
current_task.update_state(state='PROGRESS', meta={'percent':percent_completion})
databaseinserter.py:
def insertIntoDatabase(datapoints, user, description):
# iterate through the datapoints and upload them one by one
# at the end of an iteration, yield the number of datapoints completed so far
上传代码一切正常,进度条也正常。但是,我不确定如何发送 Django 消息告诉用户上传已完成(或者,如果出现错误,发送 Django 消息通知用户错误)。上传开始时,我在 views.py:
中执行此操作
from django.contrib import messages
...
messages.info(request, "Upload is in progress")
我想在上传成功后做这样的事情:
messages.info(request, "Upload successful!")
我无法在 views.py 中执行此操作,因为 Celery 任务是即刻即弃。在 celery.py 中有没有办法做到这一点?在我的 DBTask
class in celery.py 中,我定义了 on_success
和 on_failure
,那么我可以从那里发送 Django 消息吗?
此外,虽然我的投票在技术上可行,但目前并不理想。目前轮询的工作方式是它会无休止地检查任务,而不管任务是否在进行中。它很快就会淹没服务器控制台日志,我可以想象它会对整体性能产生负面影响。我对编写轮询代码还很陌生,所以我不完全确定最佳实践,以及如何仅在需要时进行轮询。处理不断轮询和服务器日志堵塞的最佳方法是什么?下面是我的投票代码。
views.py:
def poll_state(request):
data = 'Failure'
if request.is_ajax():
if 'task_id' in request.POST.keys() and request.POST['task_id']:
task_id = request.POST['task_id']
task = AsyncResult(task_id)
data = task.result or task.state
if data == 'SUCCESS' or data == 'FAILURE': # not sure what to do here; what I want is to exit the function early if the current task is already completed
return HttpResponse({}, content_type='application/json')
else:
data ='No task_id in the request'
logger.info('No task_id in the request')
else:
data = 'Not an ajax request'
logger.info('Not an ajax request')
json_data = json.dumps(data)
return HttpResponse(json_data, content_type='application/json')
以及对应的jQuery代码:
{% if task_id %}
jQuery(document).ready(function() {
var PollState = function(task_id) {
jQuery.ajax({
url: "poll_state",
type: "POST",
data: "task_id=" + task_id,
}).done(function(task) {
if (task.percent) {
jQuery('.bar').css({'width': task.percent + '%'});
jQuery('.bar').html(task.percent + '%');
}
else {
jQuery('.status').html(task);
};
PollState(task_id);
});
}
PollState('{{ task_id }}');
})
{% endif %}
(最后两个片段主要来自之前关于 Django+Celery 进度条的 Whosebug 问题。)
减少日志记录和开销的最简单方法是在下一次 PollState
调用时设置超时。您的函数现在的编写方式会立即再次轮询。像这样简单的东西:
setTimeout(function () { PollState(task_id); }, 5000);
这将大大减少您的日志记录问题和开销。
关于您的 Django 消息传递问题,您需要通过某种处理来提取那些已完成的任务。一种方法是使用 Notification
模型或类似模型,然后您可以添加一个中间件来获取未读通知并将它们注入消息框架。
感谢 Josh K 关于使用 setTimeout
的提示。不幸的是,我永远无法弄清楚中间件方法,所以我将采用更简单的方法在 poll_state
中发送 HttpResponse ,如下所示:
if data == "SUCCESS":
return HttpResponse(json.dumps({"message":"Upload successful!", "state":"SUCCESS"}, content_type='application/json'))
elif data == "FAILURE":
return HttpResponse(json.dumps({"message":"Error in upload", "state":"FAILURE"}, content_type='application/json'))
目的是根据收到的 JSON 简单地呈现成功或错误消息。现在有新问题,但这些是针对不同问题的。
在我使用 Celery 的 Django 项目中(以及许多其他项目),我有一个 Celery 任务会在后台将文件上传到数据库。我使用轮询来跟踪上传进度并显示上传进度条。以下是一些详细介绍上传过程的片段:
views.py:
from .tasks import upload_task
...
upload_task.delay(datapoints, user, description) # datapoints is a list of dictionaries, user and description are simple strings
tasks.py:
from taskman.celery import app, DBTask # taskman is the name of the Django app that has celery.py
from celery import task, current_task
@task(base=DBTask)
def upload_task(datapoints, user, description):
from utils.db.databaseinserter import insertIntoDatabase
for count in insertIntoDatabase(datapoints, user, description):
percent_completion = int(100 * (float(count) / float(len(datapoints))))
current_task.update_state(state='PROGRESS', meta={'percent':percent_completion})
databaseinserter.py:
def insertIntoDatabase(datapoints, user, description):
# iterate through the datapoints and upload them one by one
# at the end of an iteration, yield the number of datapoints completed so far
上传代码一切正常,进度条也正常。但是,我不确定如何发送 Django 消息告诉用户上传已完成(或者,如果出现错误,发送 Django 消息通知用户错误)。上传开始时,我在 views.py:
中执行此操作from django.contrib import messages
...
messages.info(request, "Upload is in progress")
我想在上传成功后做这样的事情:
messages.info(request, "Upload successful!")
我无法在 views.py 中执行此操作,因为 Celery 任务是即刻即弃。在 celery.py 中有没有办法做到这一点?在我的 DBTask
class in celery.py 中,我定义了 on_success
和 on_failure
,那么我可以从那里发送 Django 消息吗?
此外,虽然我的投票在技术上可行,但目前并不理想。目前轮询的工作方式是它会无休止地检查任务,而不管任务是否在进行中。它很快就会淹没服务器控制台日志,我可以想象它会对整体性能产生负面影响。我对编写轮询代码还很陌生,所以我不完全确定最佳实践,以及如何仅在需要时进行轮询。处理不断轮询和服务器日志堵塞的最佳方法是什么?下面是我的投票代码。
views.py:
def poll_state(request):
data = 'Failure'
if request.is_ajax():
if 'task_id' in request.POST.keys() and request.POST['task_id']:
task_id = request.POST['task_id']
task = AsyncResult(task_id)
data = task.result or task.state
if data == 'SUCCESS' or data == 'FAILURE': # not sure what to do here; what I want is to exit the function early if the current task is already completed
return HttpResponse({}, content_type='application/json')
else:
data ='No task_id in the request'
logger.info('No task_id in the request')
else:
data = 'Not an ajax request'
logger.info('Not an ajax request')
json_data = json.dumps(data)
return HttpResponse(json_data, content_type='application/json')
以及对应的jQuery代码:
{% if task_id %}
jQuery(document).ready(function() {
var PollState = function(task_id) {
jQuery.ajax({
url: "poll_state",
type: "POST",
data: "task_id=" + task_id,
}).done(function(task) {
if (task.percent) {
jQuery('.bar').css({'width': task.percent + '%'});
jQuery('.bar').html(task.percent + '%');
}
else {
jQuery('.status').html(task);
};
PollState(task_id);
});
}
PollState('{{ task_id }}');
})
{% endif %}
(最后两个片段主要来自之前关于 Django+Celery 进度条的 Whosebug 问题。)
减少日志记录和开销的最简单方法是在下一次 PollState
调用时设置超时。您的函数现在的编写方式会立即再次轮询。像这样简单的东西:
setTimeout(function () { PollState(task_id); }, 5000);
这将大大减少您的日志记录问题和开销。
关于您的 Django 消息传递问题,您需要通过某种处理来提取那些已完成的任务。一种方法是使用 Notification
模型或类似模型,然后您可以添加一个中间件来获取未读通知并将它们注入消息框架。
感谢 Josh K 关于使用 setTimeout
的提示。不幸的是,我永远无法弄清楚中间件方法,所以我将采用更简单的方法在 poll_state
中发送 HttpResponse ,如下所示:
if data == "SUCCESS":
return HttpResponse(json.dumps({"message":"Upload successful!", "state":"SUCCESS"}, content_type='application/json'))
elif data == "FAILURE":
return HttpResponse(json.dumps({"message":"Error in upload", "state":"FAILURE"}, content_type='application/json'))
目的是根据收到的 JSON 简单地呈现成功或错误消息。现在有新问题,但这些是针对不同问题的。