RabbitMQ 中的更新队列

updated queue in RabbitMQ

我有一个 task 将在对象的 POSTPATCH 之后执行,比如 ABC

代码如下:

models.py

class ABC(models.Model):
    start_at = models.DateTimeField()

task.py

@app.task
def do_something(obj_id):
    try:
        abc = ABC.objects.get(id=obj_id)
    except ObjectDoesNotExist:
        return

    eta = settings.ETA
    do_something.apply_async([abc.id], eta=eta)
    return

views.py

class ABCPost(CreateAPIView):
    serializer_class = ABCSerializer

    def post(self, request):
        # create object
        # call task do_something
    
    def patch(self, request):
        # update the `start_at` field for ABC
        # call task do_something

因此,当字段更新时,排队的消息应该在更新后的 start_at 值处执行。但是,使用上面的代码,两条消息排队等待相同的对象但时间戳不同。我怎样才能避免这种情况?

提前致谢

创建对象时需要存储 task_id :

abc = ABC.objects.get(pk=PK)
abc.task_id = do_something.apply_async([abc.pk], eta=eta).id
abc.save(updated_fields=['task_id'])

将此字段添加到 ABC 模型,并将其存储在其中。

打补丁的时候,如果第一个任务还没有执行,需要撤销:

from celery.result import AsyncResult
abc = ABC.objects.get(pk=PK)
AsyncResult(abc.task_id).revoke()
abc.task_id = do_something.apply_async([abc.pk], eta=eta).id
abc.save(updated_fields=['task_id'])

这样你仍然会有 2 条消息在队列中,但第一个消息在执行时会被跳过(撤销并没有从队列中删除,只是弹出时不执行它)