是否可以在任务执行期间使用 django-celery-results 查询 celery 任务的状态?
Is it possible to query state of a celery tasks using django-celery-results during the execution of a task?
我在我的 Django 应用程序中使用 Celery + RabbitMQ 来排队任务,
我想使用 task_id 和 task_state 跟踪任务的状态。
为此,我创建了一个 TaskModel(Model) 来存储 task_id、task_state 和数据库中的一些附加数据。在任务执行时,一个新的 TaskModel 对象被保存并随着任务的进行而更新。一切正常。
但是,我仍然需要添加很多功能和特性以及错误保护等。那时我想起了 celery 文档中提到的 django-celery-results。
所以我遵循了 django-celery-results 文档说明。任务结果存储在默认的 django 数据库中的专用 table、 但是 只有在任务结束后...而不是在 PENDING、STARTED 状态期间。
是否可以使用 django-celery-results 在 PENDING 和 STARTED 状态下存储和查询任务?或不?
谢谢
查看 django-celery-result 的源代码后发现代码非常简单 straight-forward。
为了在调用任务函数后使用django-celery-result存储任务,请使用以下内容:
from django_celery_results.models import TaskResult
import json
@shared_task(bind=True)
def foo(self, count):
print('hello')
task_result = TaskResult.objects.get(self.request.id)
counter = 1
interval = 20 #Limit updates to reduce database queries
interval_count = count/interval
for i in range(count):
print(i)
if counter>= interval_count:
interval_count+=count/interval
task_result.meta = json.dumps({'progress': counter/count})
task_result.save()
counter+=1
task_result.save()
return
def goo()
task = foo.delay(1000)
task_result = TaskResult(task_id=task.task_id)
task_result.save()
我在我的 Django 应用程序中使用 Celery + RabbitMQ 来排队任务,
我想使用 task_id 和 task_state 跟踪任务的状态。
为此,我创建了一个 TaskModel(Model) 来存储 task_id、task_state 和数据库中的一些附加数据。在任务执行时,一个新的 TaskModel 对象被保存并随着任务的进行而更新。一切正常。
但是,我仍然需要添加很多功能和特性以及错误保护等。那时我想起了 celery 文档中提到的 django-celery-results。
所以我遵循了 django-celery-results 文档说明。任务结果存储在默认的 django 数据库中的专用 table、 但是 只有在任务结束后...而不是在 PENDING、STARTED 状态期间。
是否可以使用 django-celery-results 在 PENDING 和 STARTED 状态下存储和查询任务?或不?
谢谢
查看 django-celery-result 的源代码后发现代码非常简单 straight-forward。
为了在调用任务函数后使用django-celery-result存储任务,请使用以下内容:
from django_celery_results.models import TaskResult
import json
@shared_task(bind=True)
def foo(self, count):
print('hello')
task_result = TaskResult.objects.get(self.request.id)
counter = 1
interval = 20 #Limit updates to reduce database queries
interval_count = count/interval
for i in range(count):
print(i)
if counter>= interval_count:
interval_count+=count/interval
task_result.meta = json.dumps({'progress': counter/count})
task_result.save()
counter+=1
task_result.save()
return
def goo()
task = foo.delay(1000)
task_result = TaskResult(task_id=task.task_id)
task_result.save()