Python Celery:状态改变后更新 Django 模型
Python Celery: Update django model after state change
我设法找到了 2 个类似的主题来讨论这个问题,但不幸的是我无法从中得到最好的解决方案:
- Update Django Model Field Based On Celery Task Status
- Update Django Model Field Based On Celery Task Status
我使用 Django 和 Celery(+redis 作为消息代理),我想在 celery 任务状态发生变化(从挂起 -> 成功,挂起 -> 失败)等时更新 Django 模型
我的代码:
import time
from celery import shared_task
@shared_task(name="run_simulation")
def run_simulation(simulation_id: str):
t1_start = time.perf_counter()
doSomeWork() # we may change this to sleep for instance
t1_end = time.perf_counter()
return{'process_time': t1_end - t1_start}
以及我调用任务的特定视图:
def run_simulation(request):
form = SimulationForm(request.POST)
if form.is_valid():
new_simulation = form.save()
new_simulation.save()
task_id = tasks.run_simulation.delay(new_simulation.id)
问题是,当任务状态发生变化时,更新 Simulation 的 Django 模型状态的首选方法是什么?
在文档中我找到了使用方法 on_failure
、on_success
等的处理程序 http://docs.celeryproject.org/en/latest/userguide/tasks.html#handlers
我不认为有一个首选的方法来做这样的事情,因为它取决于你的项目。
您可以使用您发送的 link 之类的监控任务。给任务一个任务id,重新调度任务,直到被监控的任务处于FINISHED状态。
from celery import AsyncResult
@app.task(bind=True)
def monitor_task(self, t_id):
"""Monitor a task"""
res = AsyncResult(t_id, backend=self.backend, app=self.app)
if res.ready():
raise self.retry(
countdown=10,
exc=Exception("Main task not done yet.")
)
您还可以创建一个事件接收器并检查任务的状态,然后将其保存在数据库中。
http://docs.celeryproject.org/en/latest/userguide/monitoring.html#real-time-processing
现在,如果您只对成功和失败状态感兴趣,那么您可以创建成功和失败回调,并注意在数据库中保存成功或失败状态。
tasks.run_simulation.apply_async(
(sim_id,),
link=tasks.success_handler.s(),
link_error=tasks.error_handler()
)
http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks
我设法找到了 2 个类似的主题来讨论这个问题,但不幸的是我无法从中得到最好的解决方案:
- Update Django Model Field Based On Celery Task Status
- Update Django Model Field Based On Celery Task Status
我使用 Django 和 Celery(+redis 作为消息代理),我想在 celery 任务状态发生变化(从挂起 -> 成功,挂起 -> 失败)等时更新 Django 模型
我的代码:
import time
from celery import shared_task
@shared_task(name="run_simulation")
def run_simulation(simulation_id: str):
t1_start = time.perf_counter()
doSomeWork() # we may change this to sleep for instance
t1_end = time.perf_counter()
return{'process_time': t1_end - t1_start}
以及我调用任务的特定视图:
def run_simulation(request):
form = SimulationForm(request.POST)
if form.is_valid():
new_simulation = form.save()
new_simulation.save()
task_id = tasks.run_simulation.delay(new_simulation.id)
问题是,当任务状态发生变化时,更新 Simulation 的 Django 模型状态的首选方法是什么?
在文档中我找到了使用方法 on_failure
、on_success
等的处理程序 http://docs.celeryproject.org/en/latest/userguide/tasks.html#handlers
我不认为有一个首选的方法来做这样的事情,因为它取决于你的项目。 您可以使用您发送的 link 之类的监控任务。给任务一个任务id,重新调度任务,直到被监控的任务处于FINISHED状态。
from celery import AsyncResult
@app.task(bind=True)
def monitor_task(self, t_id):
"""Monitor a task"""
res = AsyncResult(t_id, backend=self.backend, app=self.app)
if res.ready():
raise self.retry(
countdown=10,
exc=Exception("Main task not done yet.")
)
您还可以创建一个事件接收器并检查任务的状态,然后将其保存在数据库中。 http://docs.celeryproject.org/en/latest/userguide/monitoring.html#real-time-processing
现在,如果您只对成功和失败状态感兴趣,那么您可以创建成功和失败回调,并注意在数据库中保存成功或失败状态。
tasks.run_simulation.apply_async(
(sim_id,),
link=tasks.success_handler.s(),
link_error=tasks.error_handler()
)
http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks