使用 RabbitMQ 更新 Celery 中的任务

Update tasks in Celery with RabbitMQ

我在我的 django 项目中使用 Celery 创建任务以在将来的特定时间发送电子邮件。用户可以使用 notify_on 日期时间字段创建通知实例。然后我将 notify_on 的值作为 eta.

传递
class Notification(models.Model):
    ...
    notify_on = models.DateTimeField()


def notification_post_save(instance, *args, **kwargs):
    send_notification.apply_async((instance,), eta=instance.notify_on)

signals.post_save.connect(notification_post_save, sender=Notification)

该方法的问题在于,如果 notify_on 将被用户更改,他将收到两个(或更多)通知而不是一个。

问题是我如何更新与特定通知关联的任务,或者以某种方式删除旧的并创建新的。

首先,使用post_save,我们无法获取旧数据。所以,在这里我重写了 Notification 模型的 save() 方法。除此之外,创建一个字段来存储芹菜task_id。

<b>from celery.task.control import revoke</b>


class Notification(models.Model):
    ...
    notify_on = models.DateTimeField()
    <b>celery_task_id = models.CharField(max_length=100)

    def save(self, *args, **kwargs):
        pre_notify_on = Notification.objects.get(pk=self.pk).notify_on
        super().save(*args, **kwargs)
        post_notify_on = self.notify_on
        if not self.celery_task_id:  # initial task creation
            task_object = send_notification.apply_async((self,), eta=self.notify_on)
            Notification.objects.filter(pk=self.pk).update(celery_task_id=task_object.id)
        elif pre_notify_on != post_notify_on:
            # revoke the old task
            revoke(self.celery_task_id, terminate=True)
            task_object = send_notification.apply_async((self,), eta=self.notify_on)
            Notification.objects.filter(pk=self.pk).update(celery_task_id=task_object.id)</b>

参考

  1. Cancel an already executing task with Celery?
  2. Django: How to access original (unmodified) instance in post_save signal

我觉得之前的任务没有必要删除。您只需验证正在执行的任务是最后一个任务。为此,创建一个名为 checksum 的新字段,它是一个 UUID 字段,每次更改 notify_on 时更新该字段。在发送电子邮件的任务中检查此校验和。

class Notification(models.Model):
    checksum = models.UUIDField(default=uuid.uuid4)
    notify_on = models.DateTimeField()

def notification_post_save(instance, *args, **kwargs):
    send_notification.apply_async((instance.id, str(instance.checksum)),eta=instance.notify_on)

signals.post_save.connect(notification_post_save, sender=Notification)


@shared_task 
def send_notification(notification_id, checksum):
    notification = Notification.objects.get(id=notification_id)
    if str(notification.checksum) != checksum:
        return False
    #send email

另外,请不要每次都在通知对象上发送信号,只需在 notify_on 更改时发送即可。你也可以检查这个