在芹菜中处理`post_save`信号

handle `post_save` signal in celery

我有一个相当长的 运行ning 任务需要在插入或更新特定模型后执行。

我决定使用 post_save 信号而不是覆盖 save 方法来减少耦合。由于 Django 信号不是异步的,因此我不得不将 运行ning 工作作为一个 Celery 任务来完成(我们的堆栈中已经有了)。

我的信号处理函数的简化版本如下:

@receiver(post_save, sender=MyModel)
def my_model_post_save(sender, instance, **kwargs):
    handle_save_task.apply_async(args=(instance.pk,))

此外,因为作业是异步完成的,所以我传递了对象的主键而不是实例本身。

@app.task(queue='elastic')
def handle_save_task(instance_pk):
    try:
        instance = MyModel.objects.get(pk=instance_pk)
    except ObjectDoesNotExist:
        # Abort
        logger.warning("Saved object was deleted before this task get a chance to be executed [id = %d]" % instance_pk)
    else:
        # Do my things with instance

实际问题是celery任务执行时无法访问新保存的实例。就像它在保存之前执行一样! (信号不是叫post_save吗?真是讽刺)

By "executed before saving" 我的意思是如果它是一个新实例被插入到数据库中,在芹菜任务中我得到一个 DoesNotExist 异常并且在实例已经在数据库中的情况下保存方法被调用来更新它的一些属性我在 celery 任务中得到了具有旧 属性 值的旧实例。

一个解决方法是运行celery任务有几秒的延迟,但显然这不是一个好的解决方案,也不能保证在重负载或长时间网络延迟下的正确执行行为。

我做的是完全错误的还是稍微修改一下就可以让它工作?

这可能是由于您的更新是在事务内执行的。事务在 Celery Task 已经启动后提交,这导致 Celery Task 在 运行.

时看到您的旧值

您可以尝试以下更改:

from django.db import transaction

@receiver(post_save, sender=MyModel)
def my_model_post_save(sender, instance, **kwargs):
    transaction.on_commit(lambda: handle_save_task.apply_async(args=(instance.pk,)))