Python Celery:状态改变后更新 Django 模型

Python Celery: Update django model after state change

我设法找到了 2 个类似的主题来讨论这个问题,但不幸的是我无法从中得到最好的解决方案:

  1. Update Django Model Field Based On Celery Task Status
  2. Update Django Model Field Based On Celery Task Status

我使用 Django 和 Celery(+redis 作为消息代理),我想在 celery 任务状态发生变化(从挂起 -> 成功,挂起 -> 失败)等时更新 Django 模型

我的代码:

import time

from celery import shared_task

@shared_task(name="run_simulation")
def run_simulation(simulation_id: str):
    t1_start = time.perf_counter()
    doSomeWork() # we may change this to sleep for instance
    t1_end = time.perf_counter()
    return{'process_time': t1_end - t1_start}

以及我调用任务的特定视图:

def run_simulation(request):
    form = SimulationForm(request.POST)
    if form.is_valid():
        new_simulation = form.save()
        new_simulation.save()
        task_id = tasks.run_simulation.delay(new_simulation.id)

问题是,当任务状态发生变化时,更新 Simulation 的 Django 模型状态的首选方法是什么?

在文档中我找到了使用方法 on_failureon_success 等的处理程序 http://docs.celeryproject.org/en/latest/userguide/tasks.html#handlers

我不认为有一个首选的方法来做这样的事情,因为它取决于你的项目。 您可以使用您发送的 link 之类的监控任务。给任务一个任务id,重新调度任务,直到被监控的任务处于FINISHED状态。

from celery import AsyncResult

@app.task(bind=True)
def monitor_task(self, t_id):
    """Monitor a task"""
    res = AsyncResult(t_id, backend=self.backend, app=self.app)
    if res.ready():
        raise self.retry(
            countdown=10,
            exc=Exception("Main task not done yet.")
        )

您还可以创建一个事件接收器并检查任务的状态,然后将其保存在数据库中。 http://docs.celeryproject.org/en/latest/userguide/monitoring.html#real-time-processing

现在,如果您只对成功和失败状态感兴趣,那么您可以创建成功和失败回调,并注意在数据库中保存成功或失败状态。

tasks.run_simulation.apply_async(
    (sim_id,),
    link=tasks.success_handler.s(),
    link_error=tasks.error_handler()
)

http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks